package clickhouseReader

import (
	"context"
	"database/sql"
	"encoding/json"
	"fmt"
	"math"
	"math/rand"
	"reflect"
	"regexp"
	"sort"
	"strconv"
	"strings"
	"sync"
	"time"

	"github.com/SigNoz/signoz/pkg/prometheus"
	"github.com/SigNoz/signoz/pkg/query-service/model/metrics_explorer"
	"github.com/SigNoz/signoz/pkg/sqlstore"
	"github.com/SigNoz/signoz/pkg/telemetrystore"
	"github.com/SigNoz/signoz/pkg/types"
	"github.com/SigNoz/signoz/pkg/valuer"
	"github.com/uptrace/bun"

	"github.com/google/uuid"
	"github.com/pkg/errors"

	"github.com/prometheus/prometheus/promql"
	"github.com/prometheus/prometheus/util/stats"

	"github.com/ClickHouse/clickhouse-go/v2"
	"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
	"github.com/SigNoz/signoz/pkg/cache"
	"github.com/SigNoz/signoz/pkg/types/authtypes"

	"go.uber.org/zap"

	queryprogress "github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader/query_progress"
	"github.com/SigNoz/signoz/pkg/query-service/app/logs"
	"github.com/SigNoz/signoz/pkg/query-service/app/resource"
	"github.com/SigNoz/signoz/pkg/query-service/app/services"
	"github.com/SigNoz/signoz/pkg/query-service/app/traces/smart"
	"github.com/SigNoz/signoz/pkg/query-service/app/traces/tracedetail"
	"github.com/SigNoz/signoz/pkg/query-service/common"
	"github.com/SigNoz/signoz/pkg/query-service/constants"
	chErrors "github.com/SigNoz/signoz/pkg/query-service/errors"
	"github.com/SigNoz/signoz/pkg/query-service/metrics"
	"github.com/SigNoz/signoz/pkg/query-service/model"
	v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
	"github.com/SigNoz/signoz/pkg/query-service/telemetry"
	"github.com/SigNoz/signoz/pkg/query-service/utils"
)

const (
	primaryNamespace          = "clickhouse"
	archiveNamespace          = "clickhouse-archive"
	signozTraceDBName         = "signoz_traces"
	signozHistoryDBName       = "signoz_analytics"
	ruleStateHistoryTableName = "distributed_rule_state_history_v0"
	signozDurationMVTable     = "distributed_durationSort"
	signozUsageExplorerTable  = "distributed_usage_explorer"
	signozSpansTable          = "distributed_signoz_spans"
	signozErrorIndexTable     = "distributed_signoz_error_index_v2"
	signozTraceTableName      = "distributed_signoz_index_v2"
	signozTraceLocalTableName = "signoz_index_v2"
	signozMetricDBName        = "signoz_metrics"
	signozMetadataDbName      = "signoz_metadata"

	signozSampleLocalTableName = "samples_v4"
	signozSampleTableName      = "distributed_samples_v4"

	signozSamplesAgg5mLocalTableName = "samples_v4_agg_5m"
	signozSamplesAgg5mTableName      = "distributed_samples_v4_agg_5m"

	signozSamplesAgg30mLocalTableName = "samples_v4_agg_30m"
	signozSamplesAgg30mTableName      = "distributed_samples_v4_agg_30m"

	signozExpHistLocalTableName = "exp_hist"
	signozExpHistTableName      = "distributed_exp_hist"

	signozTSLocalTableNameV4 = "time_series_v4"
	signozTSTableNameV4      = "distributed_time_series_v4"

	signozTSLocalTableNameV46Hrs = "time_series_v4_6hrs"
	signozTSTableNameV46Hrs      = "distributed_time_series_v4_6hrs"

	signozTSLocalTableNameV41Day = "time_series_v4_1day"
	signozTSTableNameV41Day      = "distributed_time_series_v4_1day"

	signozTSLocalTableNameV41Week = "time_series_v4_1week"
	signozTSTableNameV41Week      = "distributed_time_series_v4_1week"

	signozTableAttributesMetadata      = "distributed_attributes_metadata"
	signozLocalTableAttributesMetadata = "attributes_metadata"

	signozUpdatedMetricsMetadataLocalTable = "updated_metadata"
	signozUpdatedMetricsMetadataTable      = "distributed_updated_metadata"
	minTimespanForProgressiveSearch        = time.Hour
	minTimespanForProgressiveSearchMargin  = time.Minute
	maxProgressiveSteps                    = 4
	charset                                = "abcdefghijklmnopqrstuvwxyz" +
		"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
	NANOSECOND = 1000000000
)

var (
	ErrNoOperationsTable            = errors.New("no operations table supplied")
	ErrNoIndexTable                 = errors.New("no index table supplied")
	ErrStartTimeRequired            = errors.New("start time is required for search queries")
	seededRand           *rand.Rand = rand.New(
		rand.NewSource(time.Now().UnixNano()))
)

// SpanWriter for reading spans from ClickHouse
type ClickHouseReader struct {
	db                      clickhouse.Conn
	prometheus              prometheus.Prometheus
	sqlDB                   sqlstore.SQLStore
	TraceDB                 string
	operationsTable         string
	durationTable           string
	indexTable              string
	errorTable              string
	usageExplorerTable      string
	SpansTable              string
	spanAttributeTableV2    string
	spanAttributesKeysTable string
	dependencyGraphTable    string
	topLevelOperationsTable string
	logsDB                  string
	logsTable               string
	logsLocalTable          string
	logsAttributeKeys       string
	logsResourceKeys        string
	logsTagAttributeTableV2 string
	queryProgressTracker    queryprogress.QueryProgressTracker

	logsTableV2              string
	logsLocalTableV2         string
	logsResourceTableV2      string
	logsResourceLocalTableV2 string

	liveTailRefreshSeconds int
	cluster                string

	logsTableName      string
	logsLocalTableName string

	traceTableName       string
	traceLocalTableName  string
	traceResourceTableV3 string
	traceSummaryTable    string

	fluxIntervalForTraceDetail time.Duration
	cache                      cache.Cache
	metadataDB                 string
	metadataTable              string
}

// NewTraceReader returns a TraceReader for the database
func NewReader(
	sqlDB sqlstore.SQLStore,
	telemetryStore telemetrystore.TelemetryStore,
	prometheus prometheus.Prometheus,
	cluster string,
	fluxIntervalForTraceDetail time.Duration,
	cache cache.Cache,
) *ClickHouseReader {
	options := NewOptions(primaryNamespace, archiveNamespace)
	return NewReaderFromClickhouseConnection(options, sqlDB, telemetryStore, prometheus, cluster, fluxIntervalForTraceDetail, cache)
}

func NewReaderFromClickhouseConnection(
	options *Options,
	sqlDB sqlstore.SQLStore,
	telemetryStore telemetrystore.TelemetryStore,
	prometheus prometheus.Prometheus,
	cluster string,
	fluxIntervalForTraceDetail time.Duration,
	cache cache.Cache,
) *ClickHouseReader {
	logsTableName := options.primary.LogsTableV2
	logsLocalTableName := options.primary.LogsLocalTableV2
	traceTableName := options.primary.TraceIndexTableV3
	traceLocalTableName := options.primary.TraceLocalTableNameV3

	return &ClickHouseReader{
		db:                         telemetryStore.ClickhouseDB(),
		prometheus:                 prometheus,
		sqlDB:                      sqlDB,
		TraceDB:                    options.primary.TraceDB,
		operationsTable:            options.primary.OperationsTable,
		indexTable:                 options.primary.IndexTable,
		errorTable:                 options.primary.ErrorTable,
		usageExplorerTable:         options.primary.UsageExplorerTable,
		durationTable:              options.primary.DurationTable,
		SpansTable:                 options.primary.SpansTable,
		spanAttributeTableV2:       options.primary.SpanAttributeTableV2,
		spanAttributesKeysTable:    options.primary.SpanAttributeKeysTable,
		dependencyGraphTable:       options.primary.DependencyGraphTable,
		topLevelOperationsTable:    options.primary.TopLevelOperationsTable,
		logsDB:                     options.primary.LogsDB,
		logsTable:                  options.primary.LogsTable,
		logsLocalTable:             options.primary.LogsLocalTable,
		logsAttributeKeys:          options.primary.LogsAttributeKeysTable,
		logsResourceKeys:           options.primary.LogsResourceKeysTable,
		logsTagAttributeTableV2:    options.primary.LogsTagAttributeTableV2,
		liveTailRefreshSeconds:     options.primary.LiveTailRefreshSeconds,
		cluster:                    cluster,
		queryProgressTracker:       queryprogress.NewQueryProgressTracker(),
		logsTableV2:                options.primary.LogsTableV2,
		logsLocalTableV2:           options.primary.LogsLocalTableV2,
		logsResourceTableV2:        options.primary.LogsResourceTableV2,
		logsResourceLocalTableV2:   options.primary.LogsResourceLocalTableV2,
		logsTableName:              logsTableName,
		logsLocalTableName:         logsLocalTableName,
		traceLocalTableName:        traceLocalTableName,
		traceTableName:             traceTableName,
		traceResourceTableV3:       options.primary.TraceResourceTableV3,
		traceSummaryTable:          options.primary.TraceSummaryTable,
		fluxIntervalForTraceDetail: fluxIntervalForTraceDetail,
		cache:                      cache,
		metadataDB:                 options.primary.MetadataDB,
		metadataTable:              options.primary.MetadataTable,
	}
}

func (r *ClickHouseReader) GetInstantQueryMetricsResult(ctx context.Context, queryParams *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError) {
	qry, err := r.prometheus.Engine().NewInstantQuery(ctx, r.prometheus.Storage(), nil, queryParams.Query, queryParams.Time)
	if err != nil {
		return nil, nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
	}

	res := qry.Exec(ctx)

	// Optional stats field in response if parameter "stats" is not empty.
	var qs stats.QueryStats
	if queryParams.Stats != "" {
		qs = stats.NewQueryStats(qry.Stats())
	}

	qry.Close()
	return res, &qs, nil

}

func (r *ClickHouseReader) GetQueryRangeResult(ctx context.Context, query *model.QueryRangeParams) (*promql.Result, *stats.QueryStats, *model.ApiError) {
	qry, err := r.prometheus.Engine().NewRangeQuery(ctx, r.prometheus.Storage(), nil, query.Query, query.Start, query.End, query.Step)

	if err != nil {
		return nil, nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
	}

	res := qry.Exec(ctx)

	// Optional stats field in response if parameter "stats" is not empty.
	var qs stats.QueryStats
	if query.Stats != "" {
		qs = stats.NewQueryStats(qry.Stats())
	}

	qry.Close()
	return res, &qs, nil
}

func (r *ClickHouseReader) GetServicesList(ctx context.Context) (*[]string, error) {
	services := []string{}
	rows, err := r.db.Query(ctx, fmt.Sprintf(`SELECT DISTINCT serviceName FROM %s.%s WHERE ts_bucket_start > (toUnixTimestamp(now() - INTERVAL 1 DAY) - 1800) AND toDate(timestamp) > now() - INTERVAL 1 DAY`, r.TraceDB, r.traceTableName))
	if err != nil {
		return nil, fmt.Errorf("error in processing sql query")
	}

	defer rows.Close()
	for rows.Next() {
		var serviceName string
		if err := rows.Scan(&serviceName); err != nil {
			return &services, err
		}
		services = append(services, serviceName)
	}

	return &services, nil
}

func (r *ClickHouseReader) GetTopLevelOperations(ctx context.Context, start, end time.Time, services []string) (*map[string][]string, *model.ApiError) {
	start = start.In(time.UTC)

	// The `top_level_operations` that have `time` >= start
	operations := map[string][]string{}
	// We can't use the `end` because the `top_level_operations` table has the most recent instances of the operations
	// We can only use the `start` time to filter the operations
	query := fmt.Sprintf(`SELECT name, serviceName, max(time) as ts FROM %s.%s WHERE time >= @start`, r.TraceDB, r.topLevelOperationsTable)
	if len(services) > 0 {
		query += ` AND serviceName IN @services`
	}
	query += ` GROUP BY name, serviceName ORDER BY ts DESC LIMIT 5000`

	rows, err := r.db.Query(ctx, query, clickhouse.Named("start", start), clickhouse.Named("services", services))

	if err != nil {
		zap.L().Error("Error in processing sql query", zap.Error(err))
		return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
	}

	defer rows.Close()
	for rows.Next() {
		var name, serviceName string
		var t time.Time
		if err := rows.Scan(&name, &serviceName, &t); err != nil {
			return nil, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error in reading data")}
		}
		if _, ok := operations[serviceName]; !ok {
			operations[serviceName] = []string{"overflow_operation"}
		}
		operations[serviceName] = append(operations[serviceName], name)
	}
	return &operations, nil
}

func (r *ClickHouseReader) buildResourceSubQuery(tags []model.TagQueryParam, svc string, start, end time.Time) (string, error) {
	// assuming all will be resource attributes.
	// and resource attributes are string for traces
	filterSet := v3.FilterSet{}
	for _, tag := range tags {
		// skip the collector id as we don't add it to traces
		if tag.Key == "signoz.collector.id" {
			continue
		}
		key := v3.AttributeKey{
			Key:      tag.Key,
			DataType: v3.AttributeKeyDataTypeString,
			Type:     v3.AttributeKeyTypeResource,
		}

		it := v3.FilterItem{
			Key: key,
		}

		// as of now only in and not in are supported
		switch tag.Operator {
		case model.NotInOperator:
			it.Operator = v3.FilterOperatorNotIn
			it.Value = tag.StringValues
		case model.InOperator:
			it.Operator = v3.FilterOperatorIn
			it.Value = tag.StringValues
		default:
			return "", fmt.Errorf("operator %s not supported", tag.Operator)
		}

		filterSet.Items = append(filterSet.Items, it)
	}
	filterSet.Items = append(filterSet.Items, v3.FilterItem{
		Key: v3.AttributeKey{
			Key:      "service.name",
			DataType: v3.AttributeKeyDataTypeString,
			Type:     v3.AttributeKeyTypeResource,
		},
		Operator: v3.FilterOperatorEqual,
		Value:    svc,
	})

	resourceSubQuery, err := resource.BuildResourceSubQuery(
		r.TraceDB,
		r.traceResourceTableV3,
		start.Unix()-1800,
		end.Unix(),
		&filterSet,
		[]v3.AttributeKey{},
		v3.AttributeKey{},
		false)
	if err != nil {
		zap.L().Error("Error in processing sql query", zap.Error(err))
		return "", err
	}
	return resourceSubQuery, nil
}

func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) {

	if r.indexTable == "" {
		return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable}
	}

	topLevelOps, apiErr := r.GetTopLevelOperations(ctx, *queryParams.Start, *queryParams.End, nil)
	if apiErr != nil {
		return nil, apiErr
	}

	serviceItems := []model.ServiceItem{}
	var wg sync.WaitGroup
	// limit the number of concurrent queries to not overload the clickhouse server
	sem := make(chan struct{}, 10)
	var mtx sync.RWMutex

	for svc, ops := range *topLevelOps {
		sem <- struct{}{}
		wg.Add(1)
		go func(svc string, ops []string) {
			defer wg.Done()
			defer func() { <-sem }()
			var serviceItem model.ServiceItem
			var numErrors uint64

			// Even if the total number of operations within the time range is less and the all
			// the top level operations are high, we want to warn to let user know the issue
			// with the instrumentation
			serviceItem.DataWarning = model.DataWarning{
				TopLevelOps: (*topLevelOps)[svc],
			}

			// default max_query_size = 262144
			// Let's assume the average size of the item in `ops` is 50 bytes
			// We can have 262144/50 = 5242 items in the `ops` array
			// Although we have make it as big as 5k, We cap the number of items
			// in the `ops` array to 1500

			ops = ops[:int(math.Min(1500, float64(len(ops))))]

			query := fmt.Sprintf(
				`SELECT
					quantile(0.99)(durationNano) as p99,
					avg(durationNano) as avgDuration,
					count(*) as numCalls
				FROM %s.%s
				WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end`,
				r.TraceDB, r.traceTableName,
			)
			errorQuery := fmt.Sprintf(
				`SELECT
					count(*) as numErrors
				FROM %s.%s
				WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end AND statusCode=2`,
				r.TraceDB, r.traceTableName,
			)

			args := []interface{}{}
			args = append(args,
				clickhouse.Named("start", strconv.FormatInt(queryParams.Start.UnixNano(), 10)),
				clickhouse.Named("end", strconv.FormatInt(queryParams.End.UnixNano(), 10)),
				clickhouse.Named("serviceName", svc),
				clickhouse.Named("names", ops),
			)

			resourceSubQuery, err := r.buildResourceSubQuery(queryParams.Tags, svc, *queryParams.Start, *queryParams.End)
			if err != nil {
				zap.L().Error("Error in processing sql query", zap.Error(err))
				return
			}
			query += `
					AND (
						resource_fingerprint GLOBAL IN ` +
				resourceSubQuery +
				`) AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket`

			args = append(args,
				clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)),
				clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)),
			)

			err = r.db.QueryRow(
				ctx,
				query,
				args...,
			).ScanStruct(&serviceItem)

			if serviceItem.NumCalls == 0 {
				return
			}

			if err != nil {
				zap.L().Error("Error in processing sql query", zap.Error(err))
				return
			}

			errorQuery += `
					AND (
						resource_fingerprint GLOBAL IN ` +
				resourceSubQuery +
				`) AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket`

			err = r.db.QueryRow(ctx, errorQuery, args...).Scan(&numErrors)
			if err != nil {
				zap.L().Error("Error in processing sql query", zap.Error(err))
				return
			}

			serviceItem.ServiceName = svc
			serviceItem.NumErrors = numErrors
			mtx.Lock()
			serviceItems = append(serviceItems, serviceItem)
			mtx.Unlock()
		}(svc, ops)
	}
	wg.Wait()

	for idx := range serviceItems {
		serviceItems[idx].CallRate = float64(serviceItems[idx].NumCalls) / float64(queryParams.Period)
		serviceItems[idx].ErrorRate = float64(serviceItems[idx].NumErrors) * 100 / float64(serviceItems[idx].NumCalls)
	}
	return &serviceItems, nil
}

func getStatusFilters(query string, statusParams []string, excludeMap map[string]struct{}) string {
	// status can only be two and if both are selected than they are equivalent to none selected
	if _, ok := excludeMap["status"]; ok {
		if len(statusParams) == 1 {
			if statusParams[0] == "error" {
				query += " AND hasError = false"
			} else if statusParams[0] == "ok" {
				query += " AND hasError = true"
			}
		}
	} else if len(statusParams) == 1 {
		if statusParams[0] == "error" {
			query += " AND hasError = true"
		} else if statusParams[0] == "ok" {
			query += " AND hasError = false"
		}
	}
	return query
}

func createTagQueryFromTagQueryParams(queryParams []model.TagQueryParam) []model.TagQuery {
	tags := []model.TagQuery{}
	for _, tag := range queryParams {
		if len(tag.StringValues) > 0 {
			tags = append(tags, model.NewTagQueryString(tag))
		}
		if len(tag.NumberValues) > 0 {
			tags = append(tags, model.NewTagQueryNumber(tag))
		}
		if len(tag.BoolValues) > 0 {
			tags = append(tags, model.NewTagQueryBool(tag))
		}
	}
	return tags
}

func StringWithCharset(length int, charset string) string {
	b := make([]byte, length)
	for i := range b {
		b[i] = charset[seededRand.Intn(len(charset))]
	}
	return string(b)
}

func String(length int) string {
	return StringWithCharset(length, charset)
}

func buildQueryWithTagParams(_ context.Context, tags []model.TagQuery) (string, []interface{}, *model.ApiError) {
	query := ""
	var args []interface{}
	for _, item := range tags {
		var subQuery string
		var argsSubQuery []interface{}
		tagMapType := item.GetTagMapColumn()
		switch item.GetOperator() {
		case model.EqualOperator:
			subQuery, argsSubQuery = addArithmeticOperator(item, tagMapType, "=")
		case model.NotEqualOperator:
			subQuery, argsSubQuery = addArithmeticOperator(item, tagMapType, "!=")
		case model.LessThanOperator:
			subQuery, argsSubQuery = addArithmeticOperator(item, tagMapType, "<")
		case model.GreaterThanOperator:
			subQuery, argsSubQuery = addArithmeticOperator(item, tagMapType, ">")
		case model.InOperator:
			subQuery, argsSubQuery = addInOperator(item, tagMapType, false)
		case model.NotInOperator:
			subQuery, argsSubQuery = addInOperator(item, tagMapType, true)
		case model.LessThanEqualOperator:
			subQuery, argsSubQuery = addArithmeticOperator(item, tagMapType, "<=")
		case model.GreaterThanEqualOperator:
			subQuery, argsSubQuery = addArithmeticOperator(item, tagMapType, ">=")
		case model.ContainsOperator:
			subQuery, argsSubQuery = addContainsOperator(item, tagMapType, false)
		case model.NotContainsOperator:
			subQuery, argsSubQuery = addContainsOperator(item, tagMapType, true)
		case model.StartsWithOperator:
			subQuery, argsSubQuery = addStartsWithOperator(item, tagMapType, false)
		case model.NotStartsWithOperator:
			subQuery, argsSubQuery = addStartsWithOperator(item, tagMapType, true)
		case model.ExistsOperator:
			subQuery, argsSubQuery = addExistsOperator(item, tagMapType, false)
		case model.NotExistsOperator:
			subQuery, argsSubQuery = addExistsOperator(item, tagMapType, true)
		default:
			return "", nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("filter operator %s not supported", item.GetOperator())}
		}
		query += subQuery
		args = append(args, argsSubQuery...)
	}
	return query, args, nil
}

func addInOperator(item model.TagQuery, tagMapType string, not bool) (string, []interface{}) {
	values := item.GetValues()
	args := []interface{}{}
	notStr := ""
	if not {
		notStr = "NOT"
	}
	tagValuePair := []string{}
	for _, value := range values {
		tagKey := "inTagKey" + String(5)
		tagValue := "inTagValue" + String(5)
		tagValuePair = append(tagValuePair, fmt.Sprintf("%s[@%s] = @%s", tagMapType, tagKey, tagValue))
		args = append(args, clickhouse.Named(tagKey, item.GetKey()))
		args = append(args, clickhouse.Named(tagValue, value))
	}
	return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagValuePair, " OR ")), args
}

func addContainsOperator(item model.TagQuery, tagMapType string, not bool) (string, []interface{}) {
	values := item.GetValues()
	args := []interface{}{}
	notStr := ""
	if not {
		notStr = "NOT"
	}
	tagValuePair := []string{}
	for _, value := range values {
		tagKey := "containsTagKey" + String(5)
		tagValue := "containsTagValue" + String(5)
		tagValuePair = append(tagValuePair, fmt.Sprintf("%s[@%s] ILIKE @%s", tagMapType, tagKey, tagValue))
		args = append(args, clickhouse.Named(tagKey, item.GetKey()))
		args = append(args, clickhouse.Named(tagValue, "%"+fmt.Sprintf("%v", value)+"%"))
	}
	return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagValuePair, " OR ")), args
}

func addStartsWithOperator(item model.TagQuery, tagMapType string, not bool) (string, []interface{}) {
	values := item.GetValues()
	args := []interface{}{}
	notStr := ""
	if not {
		notStr = "NOT"
	}
	tagValuePair := []string{}
	for _, value := range values {
		tagKey := "startsWithTagKey" + String(5)
		tagValue := "startsWithTagValue" + String(5)
		tagValuePair = append(tagValuePair, fmt.Sprintf("%s[@%s] ILIKE @%s", tagMapType, tagKey, tagValue))
		args = append(args, clickhouse.Named(tagKey, item.GetKey()))
		args = append(args, clickhouse.Named(tagValue, "%"+fmt.Sprintf("%v", value)+"%"))
	}
	return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagValuePair, " OR ")), args
}

func addArithmeticOperator(item model.TagQuery, tagMapType string, operator string) (string, []interface{}) {
	values := item.GetValues()
	args := []interface{}{}
	tagValuePair := []string{}
	for _, value := range values {
		tagKey := "arithmeticTagKey" + String(5)
		tagValue := "arithmeticTagValue" + String(5)
		tagValuePair = append(tagValuePair, fmt.Sprintf("%s[@%s] %s @%s", tagMapType, tagKey, operator, tagValue))
		args = append(args, clickhouse.Named(tagKey, item.GetKey()))
		args = append(args, clickhouse.Named(tagValue, value))
	}
	return fmt.Sprintf(" AND (%s)", strings.Join(tagValuePair, " OR ")), args
}

func addExistsOperator(item model.TagQuery, tagMapType string, not bool) (string, []interface{}) {
	values := item.GetValues()
	notStr := ""
	if not {
		notStr = "NOT"
	}
	args := []interface{}{}
	tagOperatorPair := []string{}
	for range values {
		tagKey := "existsTagKey" + String(5)
		tagOperatorPair = append(tagOperatorPair, fmt.Sprintf("mapContains(%s, @%s)", tagMapType, tagKey))
		args = append(args, clickhouse.Named(tagKey, item.GetKey()))
	}
	return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagOperatorPair, " OR ")), args
}

func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *model.GetTopOperationsParams) (*[]model.TopOperationsItem, *model.ApiError) {

	namedArgs := []interface{}{
		clickhouse.Named("start", strconv.FormatInt(queryParams.Start.UnixNano(), 10)),
		clickhouse.Named("end", strconv.FormatInt(queryParams.End.UnixNano(), 10)),
		clickhouse.Named("serviceName", queryParams.ServiceName),
		clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)),
		clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)),
	}

	var topOperationsItems []model.TopOperationsItem

	query := fmt.Sprintf(`
		SELECT
			quantile(0.5)(durationNano) as p50,
			quantile(0.95)(durationNano) as p95,
			quantile(0.99)(durationNano) as p99,
			COUNT(*) as numCalls,
			countIf(statusCode=2) as errorCount,
			name
		FROM %s.%s
		WHERE serviceName = @serviceName AND timestamp>= @start AND timestamp<= @end`,
		r.TraceDB, r.traceTableName,
	)

	resourceSubQuery, err := r.buildResourceSubQuery(queryParams.Tags, queryParams.ServiceName, *queryParams.Start, *queryParams.End)
	if err != nil {
		zap.L().Error("Error in processing sql query", zap.Error(err))
		return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
	}
	query += `
			AND (
				resource_fingerprint GLOBAL IN ` +
		resourceSubQuery +
		`) AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket`

	query += " GROUP BY name ORDER BY p99 DESC"
	if queryParams.Limit > 0 {
		query += " LIMIT @limit"
		namedArgs = append(namedArgs, clickhouse.Named("limit", queryParams.Limit))
	}
	err = r.db.Select(ctx, &topOperationsItems, query, namedArgs...)

	if err != nil {
		zap.L().Error("Error in processing sql query", zap.Error(err))
		return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
	}

	if topOperationsItems == nil {
		topOperationsItems = []model.TopOperationsItem{}
	}

	return &topOperationsItems, nil
}

func (r *ClickHouseReader) GetUsage(ctx context.Context, queryParams *model.GetUsageParams) (*[]model.UsageItem, error) {

	var usageItems []model.UsageItem
	namedArgs := []interface{}{
		clickhouse.Named("interval", queryParams.StepHour),
		clickhouse.Named("start", strconv.FormatInt(queryParams.Start.UnixNano(), 10)),
		clickhouse.Named("end", strconv.FormatInt(queryParams.End.UnixNano(), 10)),
	}
	var query string
	if len(queryParams.ServiceName) != 0 {
		namedArgs = append(namedArgs, clickhouse.Named("serviceName", queryParams.ServiceName))
		query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL @interval HOUR) as time, sum(count) as count FROM %s.%s WHERE service_name=@serviceName AND timestamp>=@start AND timestamp<=@end GROUP BY time ORDER BY time ASC", r.TraceDB, r.usageExplorerTable)
	} else {
		query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL @interval HOUR) as time, sum(count) as count FROM %s.%s WHERE timestamp>=@start AND timestamp<=@end GROUP BY time ORDER BY time ASC", r.TraceDB, r.usageExplorerTable)
	}

	err := r.db.Select(ctx, &usageItems, query, namedArgs...)

	zap.L().Info(query)

	if err != nil {
		zap.L().Error("Error in processing sql query", zap.Error(err))
		return nil, fmt.Errorf("error in processing sql query")
	}

	for i := range usageItems {
		usageItems[i].Timestamp = uint64(usageItems[i].Time.UnixNano())
	}

	if usageItems == nil {
		usageItems = []model.UsageItem{}
	}

	return &usageItems, nil
}

func (r *ClickHouseReader) GetSpansForTrace(ctx context.Context, traceID string, traceDetailsQuery string) ([]model.SpanItemV2, *model.ApiError) {
	var traceSummary model.TraceSummary
	summaryQuery := fmt.Sprintf("SELECT * from %s.%s WHERE trace_id=$1", r.TraceDB, r.traceSummaryTable)
	err := r.db.QueryRow(ctx, summaryQuery, traceID).Scan(&traceSummary.TraceID, &traceSummary.Start, &traceSummary.End, &traceSummary.NumSpans)
	if err != nil {
		if err == sql.ErrNoRows {
			return []model.SpanItemV2{}, nil
		}
		zap.L().Error("Error in processing trace summary sql query", zap.Error(err))
		return nil, model.ExecutionError(fmt.Errorf("error in processing trace summary sql query: %w", err))
	}

	var searchScanResponses []model.SpanItemV2
	queryStartTime := time.Now()
	err = r.db.Select(ctx, &searchScanResponses, traceDetailsQuery, traceID, strconv.FormatInt(traceSummary.Start.Unix()-1800, 10), strconv.FormatInt(traceSummary.End.Unix(), 10))
	zap.L().Info(traceDetailsQuery)
	if err != nil {
		zap.L().Error("Error in processing sql query", zap.Error(err))
		return nil, model.ExecutionError(fmt.Errorf("error in processing trace data sql query: %w", err))
	}
	zap.L().Info("trace details query took: ", zap.Duration("duration", time.Since(queryStartTime)), zap.String("traceID", traceID))

	return searchScanResponses, nil
}

func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadataCache(ctx context.Context, orgID valuer.UUID, traceID string) (*model.GetWaterfallSpansForTraceWithMetadataCache, error) {
	cachedTraceData := new(model.GetWaterfallSpansForTraceWithMetadataCache)
	err := r.cache.Get(ctx, orgID, strings.Join([]string{"getWaterfallSpansForTraceWithMetadata", traceID}, "-"), cachedTraceData, false)
	if err != nil {
		zap.L().Debug("error in retrieving getWaterfallSpansForTraceWithMetadata cache", zap.Error(err), zap.String("traceID", traceID))
		return nil, err
	}

	if time.Since(time.UnixMilli(int64(cachedTraceData.EndTime))) < r.fluxIntervalForTraceDetail {
		zap.L().Info("the trace end time falls under the flux interval, skipping getWaterfallSpansForTraceWithMetadata cache", zap.String("traceID", traceID))
		return nil, errors.Errorf("the trace end time falls under the flux interval, skipping getWaterfallSpansForTraceWithMetadata cache, traceID: %s", traceID)
	}

	zap.L().Info("cache is successfully hit, applying cache for getWaterfallSpansForTraceWithMetadata", zap.String("traceID", traceID))
	return cachedTraceData, nil
}

func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Context, orgID valuer.UUID, traceID string, req *model.GetWaterfallSpansForTraceWithMetadataParams) (*model.GetWaterfallSpansForTraceWithMetadataResponse, *model.ApiError) {
	response := new(model.GetWaterfallSpansForTraceWithMetadataResponse)
	var startTime, endTime, durationNano, totalErrorSpans, totalSpans uint64
	var spanIdToSpanNodeMap = map[string]*model.Span{}
	var traceRoots []*model.Span
	var serviceNameToTotalDurationMap = map[string]uint64{}
	var serviceNameIntervalMap = map[string][]tracedetail.Interval{}
	var hasMissingSpans bool

	claims, errv2 := authtypes.ClaimsFromContext(ctx)
	cachedTraceData, err := r.GetWaterfallSpansForTraceWithMetadataCache(ctx, orgID, traceID)
	if err == nil {
		startTime = cachedTraceData.StartTime
		endTime = cachedTraceData.EndTime
		durationNano = cachedTraceData.DurationNano
		spanIdToSpanNodeMap = cachedTraceData.SpanIdToSpanNodeMap
		serviceNameToTotalDurationMap = cachedTraceData.ServiceNameToTotalDurationMap
		traceRoots = cachedTraceData.TraceRoots
		totalSpans = cachedTraceData.TotalSpans
		totalErrorSpans = cachedTraceData.TotalErrorSpans
		hasMissingSpans = cachedTraceData.HasMissingSpans

		if errv2 == nil {
			telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_TRACE_DETAIL_API, map[string]interface{}{"traceSize": totalSpans}, claims.Email, true, false)
		}
	}

	if err != nil {
		zap.L().Info("cache miss for getWaterfallSpansForTraceWithMetadata", zap.String("traceID", traceID))

		searchScanResponses, err := r.GetSpansForTrace(ctx, traceID, fmt.Sprintf("SELECT DISTINCT ON (span_id) timestamp, duration_nano, span_id, trace_id, has_error, kind, resource_string_service$$name, name, references, attributes_string, attributes_number, attributes_bool, resources_string, events, status_message, status_code_string, kind_string FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", r.TraceDB, r.traceTableName))
		if err != nil {
			return nil, err
		}
		if len(searchScanResponses) == 0 {
			return response, nil
		}
		totalSpans = uint64(len(searchScanResponses))

		if errv2 == nil {
			telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_TRACE_DETAIL_API, map[string]interface{}{"traceSize": totalSpans}, claims.Email, true, false)
		}

		processingBeforeCache := time.Now()
		for _, item := range searchScanResponses {
			ref := []model.OtelSpanRef{}
			err := json.Unmarshal([]byte(item.References), &ref)
			if err != nil {
				zap.L().Error("getWaterfallSpansForTraceWithMetadata: error unmarshalling references", zap.Error(err), zap.String("traceID", traceID))
				return nil, model.BadRequest(fmt.Errorf("getWaterfallSpansForTraceWithMetadata: error unmarshalling references %w", err))
			}

			// merge attributes_number and attributes_bool to attributes_string
			for k, v := range item.Attributes_bool {
				item.Attributes_string[k] = fmt.Sprintf("%v", v)
			}
			for k, v := range item.Attributes_number {
				item.Attributes_string[k] = strconv.FormatFloat(v, 'f', -1, 64)
			}
			for k, v := range item.Resources_string {
				item.Attributes_string[k] = v
			}

			jsonItem := model.Span{
				SpanID:           item.SpanID,
				TraceID:          item.TraceID,
				ServiceName:      item.ServiceName,
				Name:             item.Name,
				Kind:             int32(item.Kind),
				DurationNano:     item.DurationNano,
				HasError:         item.HasError,
				StatusMessage:    item.StatusMessage,
				StatusCodeString: item.StatusCodeString,
				SpanKind:         item.SpanKind,
				References:       ref,
				Events:           item.Events,
				TagMap:           item.Attributes_string,
				Children:         make([]*model.Span, 0),
			}

			// metadata calculation
			startTimeUnixNano := uint64(item.TimeUnixNano.UnixNano())
			if startTime == 0 || startTimeUnixNano < startTime {
				startTime = startTimeUnixNano
			}
			if endTime == 0 || (startTimeUnixNano+jsonItem.DurationNano) > endTime {
				endTime = (startTimeUnixNano + jsonItem.DurationNano)
			}
			if durationNano == 0 || jsonItem.DurationNano > durationNano {
				durationNano = jsonItem.DurationNano
			}

			if jsonItem.HasError {
				totalErrorSpans = totalErrorSpans + 1
			}

			// convert start timestamp to millis because right now frontend is expecting it in millis
			jsonItem.TimeUnixNano = uint64(item.TimeUnixNano.UnixNano() / 1000000)

			// collect the intervals for service for execution time calculation
			serviceNameIntervalMap[jsonItem.ServiceName] =
				append(serviceNameIntervalMap[jsonItem.ServiceName], tracedetail.Interval{StartTime: jsonItem.TimeUnixNano, Duration: jsonItem.DurationNano / 1000000, Service: jsonItem.ServiceName})

			// append to the span node map
			spanIdToSpanNodeMap[jsonItem.SpanID] = &jsonItem
		}

		// traverse through the map and append each node to the children array of the parent node
		// and add the missing spans
		for _, spanNode := range spanIdToSpanNodeMap {
			hasParentSpanNode := false
			for _, reference := range spanNode.References {
				if reference.RefType == "CHILD_OF" && reference.SpanId != "" {
					hasParentSpanNode = true

					if parentNode, exists := spanIdToSpanNodeMap[reference.SpanId]; exists {
						parentNode.Children = append(parentNode.Children, spanNode)
					} else {
						// insert the missing span
						missingSpan := model.Span{
							SpanID:           reference.SpanId,
							TraceID:          spanNode.TraceID,
							ServiceName:      "",
							Name:             "Missing Span",
							TimeUnixNano:     spanNode.TimeUnixNano,
							Kind:             0,
							DurationNano:     spanNode.DurationNano,
							HasError:         false,
							StatusMessage:    "",
							StatusCodeString: "",
							SpanKind:         "",
							Children:         make([]*model.Span, 0),
						}
						missingSpan.Children = append(missingSpan.Children, spanNode)
						spanIdToSpanNodeMap[missingSpan.SpanID] = &missingSpan
						traceRoots = append(traceRoots, &missingSpan)
						hasMissingSpans = true
					}
				}
			}
			if !hasParentSpanNode && !tracedetail.ContainsWaterfallSpan(traceRoots, spanNode) {
				traceRoots = append(traceRoots, spanNode)
			}
		}

		// sort the trace roots to add missing spans at the right order
		sort.Slice(traceRoots, func(i, j int) bool {
			if traceRoots[i].TimeUnixNano == traceRoots[j].TimeUnixNano {
				return traceRoots[i].Name < traceRoots[j].Name
			}
			return traceRoots[i].TimeUnixNano < traceRoots[j].TimeUnixNano
		})

		serviceNameToTotalDurationMap = tracedetail.CalculateServiceTime(serviceNameIntervalMap)

		traceCache := model.GetWaterfallSpansForTraceWithMetadataCache{
			StartTime:                     startTime,
			EndTime:                       endTime,
			DurationNano:                  durationNano,
			TotalSpans:                    totalSpans,
			TotalErrorSpans:               totalErrorSpans,
			SpanIdToSpanNodeMap:           spanIdToSpanNodeMap,
			ServiceNameToTotalDurationMap: serviceNameToTotalDurationMap,
			TraceRoots:                    traceRoots,
			HasMissingSpans:               hasMissingSpans,
		}

		zap.L().Info("getWaterfallSpansForTraceWithMetadata: processing pre cache", zap.Duration("duration", time.Since(processingBeforeCache)), zap.String("traceID", traceID))
		cacheErr := r.cache.Set(ctx, orgID, strings.Join([]string{"getWaterfallSpansForTraceWithMetadata", traceID}, "-"), &traceCache, time.Minute*5)
		if cacheErr != nil {
			zap.L().Debug("failed to store cache for getWaterfallSpansForTraceWithMetadata", zap.String("traceID", traceID), zap.Error(err))
		}
	}

	processingPostCache := time.Now()
	selectedSpans, uncollapsedSpans, rootServiceName, rootServiceEntryPoint := tracedetail.GetSelectedSpans(req.UncollapsedSpans, req.SelectedSpanID, traceRoots, spanIdToSpanNodeMap, req.IsSelectedSpanIDUnCollapsed)
	zap.L().Info("getWaterfallSpansForTraceWithMetadata: processing post cache", zap.Duration("duration", time.Since(processingPostCache)), zap.String("traceID", traceID))

	response.Spans = selectedSpans
	response.UncollapsedSpans = uncollapsedSpans
	response.StartTimestampMillis = startTime / 1000000
	response.EndTimestampMillis = endTime / 1000000
	response.TotalSpansCount = totalSpans
	response.TotalErrorSpansCount = totalErrorSpans
	response.RootServiceName = rootServiceName
	response.RootServiceEntryPoint = rootServiceEntryPoint
	response.ServiceNameToTotalDurationMap = serviceNameToTotalDurationMap
	response.HasMissingSpans = hasMissingSpans
	return response, nil
}

func (r *ClickHouseReader) GetFlamegraphSpansForTraceCache(ctx context.Context, orgID valuer.UUID, traceID string) (*model.GetFlamegraphSpansForTraceCache, error) {
	cachedTraceData := new(model.GetFlamegraphSpansForTraceCache)
	err := r.cache.Get(ctx, orgID, strings.Join([]string{"getFlamegraphSpansForTrace", traceID}, "-"), cachedTraceData, false)
	if err != nil {
		zap.L().Debug("error in retrieving getFlamegraphSpansForTrace cache", zap.Error(err), zap.String("traceID", traceID))
		return nil, err
	}

	if time.Since(time.UnixMilli(int64(cachedTraceData.EndTime))) < r.fluxIntervalForTraceDetail {
		zap.L().Info("the trace end time falls under the flux interval, skipping getFlamegraphSpansForTrace cache", zap.String("traceID", traceID))
		return nil, errors.Errorf("the trace end time falls under the flux interval, skipping getFlamegraphSpansForTrace cache, traceID: %s", traceID)
	}

	zap.L().Info("cache is successfully hit, applying cache for getFlamegraphSpansForTrace", zap.String("traceID", traceID))
	return cachedTraceData, nil
}

func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, orgID valuer.UUID, traceID string, req *model.GetFlamegraphSpansForTraceParams) (*model.GetFlamegraphSpansForTraceResponse, *model.ApiError) {
	trace := new(model.GetFlamegraphSpansForTraceResponse)
	var startTime, endTime, durationNano uint64
	var spanIdToSpanNodeMap = map[string]*model.FlamegraphSpan{}
	// map[traceID][level]span
	var selectedSpans = [][]*model.FlamegraphSpan{}
	var traceRoots []*model.FlamegraphSpan

	// get the trace tree from cache!
	cachedTraceData, err := r.GetFlamegraphSpansForTraceCache(ctx, orgID, traceID)

	if err == nil {
		startTime = cachedTraceData.StartTime
		endTime = cachedTraceData.EndTime
		durationNano = cachedTraceData.DurationNano
		selectedSpans = cachedTraceData.SelectedSpans
		traceRoots = cachedTraceData.TraceRoots
	}

	if err != nil {
		zap.L().Info("cache miss for getFlamegraphSpansForTrace", zap.String("traceID", traceID))

		searchScanResponses, err := r.GetSpansForTrace(ctx, traceID, fmt.Sprintf("SELECT timestamp, duration_nano, span_id, trace_id, has_error,references, resource_string_service$$name, name FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", r.TraceDB, r.traceTableName))
		if err != nil {
			return nil, err
		}
		if len(searchScanResponses) == 0 {
			return trace, nil
		}

		processingBeforeCache := time.Now()
		for _, item := range searchScanResponses {
			ref := []model.OtelSpanRef{}
			err := json.Unmarshal([]byte(item.References), &ref)
			if err != nil {
				zap.L().Error("Error unmarshalling references", zap.Error(err))
				return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("error in unmarshalling references: %w", err)}
			}

			jsonItem := model.FlamegraphSpan{
				SpanID:       item.SpanID,
				TraceID:      item.TraceID,
				ServiceName:  item.ServiceName,
				Name:         item.Name,
				DurationNano: item.DurationNano,
				HasError:     item.HasError,
				References:   ref,
				Children:     make([]*model.FlamegraphSpan, 0),
			}

			// metadata calculation
			startTimeUnixNano := uint64(item.TimeUnixNano.UnixNano())
			if startTime == 0 || startTimeUnixNano < startTime {
				startTime = startTimeUnixNano
			}
			if endTime == 0 || (startTimeUnixNano+jsonItem.DurationNano) > endTime {
				endTime = (startTimeUnixNano + jsonItem.DurationNano)
			}
			if durationNano == 0 || jsonItem.DurationNano > durationNano {
				durationNano = jsonItem.DurationNano
			}

			jsonItem.TimeUnixNano = uint64(item.TimeUnixNano.UnixNano() / 1000000)
			spanIdToSpanNodeMap[jsonItem.SpanID] = &jsonItem
		}

		// traverse through the map and append each node to the children array of the parent node
		// and add missing spans
		for _, spanNode := range spanIdToSpanNodeMap {
			hasParentSpanNode := false
			for _, reference := range spanNode.References {
				if reference.RefType == "CHILD_OF" && reference.SpanId != "" {
					hasParentSpanNode = true
					if parentNode, exists := spanIdToSpanNodeMap[reference.SpanId]; exists {
						parentNode.Children = append(parentNode.Children, spanNode)
					} else {
						// insert the missing spans
						missingSpan := model.FlamegraphSpan{
							SpanID:       reference.SpanId,
							TraceID:      spanNode.TraceID,
							ServiceName:  "",
							Name:         "Missing Span",
							TimeUnixNano: spanNode.TimeUnixNano,
							DurationNano: spanNode.DurationNano,
							HasError:     false,
							Children:     make([]*model.FlamegraphSpan, 0),
						}
						missingSpan.Children = append(missingSpan.Children, spanNode)
						spanIdToSpanNodeMap[missingSpan.SpanID] = &missingSpan
						traceRoots = append(traceRoots, &missingSpan)
					}
				}
			}
			if !hasParentSpanNode && !tracedetail.ContainsFlamegraphSpan(traceRoots, spanNode) {
				traceRoots = append(traceRoots, spanNode)
			}
		}

		selectedSpans = tracedetail.GetSelectedSpansForFlamegraph(traceRoots, spanIdToSpanNodeMap)
		traceCache := model.GetFlamegraphSpansForTraceCache{
			StartTime:     startTime,
			EndTime:       endTime,
			DurationNano:  durationNano,
			SelectedSpans: selectedSpans,
			TraceRoots:    traceRoots,
		}

		zap.L().Info("getFlamegraphSpansForTrace: processing pre cache", zap.Duration("duration", time.Since(processingBeforeCache)), zap.String("traceID", traceID))
		cacheErr := r.cache.Set(ctx, orgID, strings.Join([]string{"getFlamegraphSpansForTrace", traceID}, "-"), &traceCache, time.Minute*5)
		if cacheErr != nil {
			zap.L().Debug("failed to store cache for getFlamegraphSpansForTrace", zap.String("traceID", traceID), zap.Error(err))
		}
	}

	processingPostCache := time.Now()
	selectedSpansForRequest := tracedetail.GetSelectedSpansForFlamegraphForRequest(req.SelectedSpanID, selectedSpans, startTime, endTime)
	zap.L().Info("getFlamegraphSpansForTrace: processing post cache", zap.Duration("duration", time.Since(processingPostCache)), zap.String("traceID", traceID))

	trace.Spans = selectedSpansForRequest
	trace.StartTimestampMillis = startTime / 1000000
	trace.EndTimestampMillis = endTime / 1000000
	return trace, nil
}

func (r *ClickHouseReader) GetDependencyGraph(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error) {

	response := []model.ServiceMapDependencyResponseItem{}

	args := []interface{}{}
	args = append(args,
		clickhouse.Named("start", uint64(queryParams.Start.Unix())),
		clickhouse.Named("end", uint64(queryParams.End.Unix())),
		clickhouse.Named("duration", uint64(queryParams.End.Unix()-queryParams.Start.Unix())),
	)

	query := fmt.Sprintf(`
		WITH
			quantilesMergeState(0.5, 0.75, 0.9, 0.95, 0.99)(duration_quantiles_state) AS duration_quantiles_state,
			finalizeAggregation(duration_quantiles_state) AS result
		SELECT
			src as parent,
			dest as child,
			result[1] AS p50,
			result[2] AS p75,
			result[3] AS p90,
			result[4] AS p95,
			result[5] AS p99,
			sum(total_count) as callCount,
			sum(total_count)/ @duration AS callRate,
			sum(error_count)/sum(total_count) * 100 as errorRate
		FROM %s.%s
		WHERE toUInt64(toDateTime(timestamp)) >= @start AND toUInt64(toDateTime(timestamp)) <= @end`,
		r.TraceDB, r.dependencyGraphTable,
	)

	tags := createTagQueryFromTagQueryParams(queryParams.Tags)
	filterQuery, filterArgs := services.BuildServiceMapQuery(tags)
	query += filterQuery + " GROUP BY src, dest;"
	args = append(args, filterArgs...)

	zap.L().Debug("GetDependencyGraph query", zap.String("query", query), zap.Any("args", args))

	err := r.db.Select(ctx, &response, query, args...)

	if err != nil {
		zap.L().Error("Error in processing sql query", zap.Error(err))
		return nil, fmt.Errorf("error in processing sql query %w", err)
	}

	return &response, nil
}

func getLocalTableName(tableName string) string {

	tableNameSplit := strings.Split(tableName, ".")
	return tableNameSplit[0] + "." + strings.Split(tableNameSplit[1], "distributed_")[1]

}

func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
	// uuid is used as transaction id
	uuidWithHyphen := uuid.New()
	uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1)

	coldStorageDuration := -1
	if len(params.ColdStorageVolume) > 0 {
		coldStorageDuration = int(params.ToColdStorageDuration)
	}

	tableNameArray := []string{r.logsDB + "." + r.logsLocalTableV2, r.logsDB + "." + r.logsResourceLocalTableV2}

	// check if there is existing things to be done
	for _, tableName := range tableNameArray {
		statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
		if err != nil {
			return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
		}
		if statusItem.Status == constants.StatusPending {
			return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
		}
	}

	// TTL query for logs_v2 table
	ttlLogsV2 := fmt.Sprintf(
		"ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(timestamp / 1000000000) + "+
			"INTERVAL %v SECOND DELETE", tableNameArray[0], r.cluster, params.DelDuration)
	if len(params.ColdStorageVolume) > 0 {
		ttlLogsV2 += fmt.Sprintf(", toDateTime(timestamp / 1000000000)"+
			" + INTERVAL %v SECOND TO VOLUME '%s'",
			params.ToColdStorageDuration, params.ColdStorageVolume)
	}

	// TTL query for logs_v2_resource table
	// adding 1800 as our bucket size is 1800 seconds
	ttlLogsV2Resource := fmt.Sprintf(
		"ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + "+
			"INTERVAL %v SECOND DELETE", tableNameArray[1], r.cluster, params.DelDuration)
	if len(params.ColdStorageVolume) > 0 {
		ttlLogsV2Resource += fmt.Sprintf(", toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + "+
			"INTERVAL %v SECOND TO VOLUME '%s'",
			params.ToColdStorageDuration, params.ColdStorageVolume)
	}

	ttlPayload := map[string]string{
		tableNameArray[0]: ttlLogsV2,
		tableNameArray[1]: ttlLogsV2Resource,
	}

	// set the ttl if nothing is pending/ no errors
	go func(ttlPayload map[string]string) {
		for tableName, query := range ttlPayload {
			// https://github.com/SigNoz/signoz/issues/5470
			// we will change ttl for only the new parts and not the old ones
			query += " SETTINGS materialize_ttl_after_modify=0"

			ttl := types.TTLSetting{
				Identifiable: types.Identifiable{
					ID: valuer.GenerateUUID(),
				},
				TimeAuditable: types.TimeAuditable{
					CreatedAt: time.Now(),
					UpdatedAt: time.Now(),
				},
				TransactionID:  uuid,
				TableName:      tableName,
				TTL:            int(params.DelDuration),
				Status:         constants.StatusPending,
				ColdStorageTTL: coldStorageDuration,
				OrgID:          orgID,
			}
			_, dbErr := r.
				sqlDB.
				BunDB().
				NewInsert().
				Model(&ttl).
				Exec(ctx)
			if dbErr != nil {
				zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr))
				return
			}

			err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
			if err != nil {
				zap.L().Error("error in setting cold storage", zap.Error(err))
				statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
				if err == nil {
					_, dbErr := r.
						sqlDB.
						BunDB().
						NewUpdate().
						Model(new(types.TTLSetting)).
						Set("updated_at = ?", time.Now()).
						Set("status = ?", constants.StatusFailed).
						Where("id = ?", statusItem.ID.StringValue()).
						Exec(ctx)
					if dbErr != nil {
						zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
						return
					}
				}
				return
			}
			zap.L().Info("Executing TTL request: ", zap.String("request", query))
			statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName)
			if err := r.db.Exec(ctx, query); err != nil {
				zap.L().Error("error while setting ttl", zap.Error(err))
				_, dbErr := r.
					sqlDB.
					BunDB().
					NewUpdate().
					Model(new(types.TTLSetting)).
					Set("updated_at = ?", time.Now()).
					Set("status = ?", constants.StatusFailed).
					Where("id = ?", statusItem.ID.StringValue()).
					Exec(ctx)
				if dbErr != nil {
					zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
					return
				}
				return
			}
			_, dbErr = r.
				sqlDB.
				BunDB().
				NewUpdate().
				Model(new(types.TTLSetting)).
				Set("updated_at = ?", time.Now()).
				Set("status = ?", constants.StatusSuccess).
				Where("id = ?", statusItem.ID.StringValue()).
				Exec(ctx)
			if dbErr != nil {
				zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
				return
			}
		}

	}(ttlPayload)
	return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
}

func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
	// uuid is used as transaction id
	uuidWithHyphen := uuid.New()
	uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1)
	tableNames := []string{
		r.TraceDB + "." + r.traceTableName,
		r.TraceDB + "." + r.traceResourceTableV3,
		r.TraceDB + "." + signozErrorIndexTable,
		r.TraceDB + "." + signozUsageExplorerTable,
		r.TraceDB + "." + defaultDependencyGraphTable,
		r.TraceDB + "." + r.traceSummaryTable,
	}

	coldStorageDuration := -1
	if len(params.ColdStorageVolume) > 0 {
		coldStorageDuration = int(params.ToColdStorageDuration)
	}

	// check if there is existing things to be done
	for _, tableName := range tableNames {
		statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
		if err != nil {
			return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
		}
		if statusItem.Status == constants.StatusPending {
			return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
		}
	}

	// TTL query
	ttlV2 := "ALTER TABLE %s ON CLUSTER %s MODIFY TTL toDateTime(%s) + INTERVAL %v SECOND DELETE"
	ttlV2ColdStorage := ", toDateTime(%s) + INTERVAL %v SECOND TO VOLUME '%s'"

	// TTL query for resource table
	ttlV2Resource := "ALTER TABLE %s ON CLUSTER %s MODIFY TTL toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + INTERVAL %v SECOND DELETE"
	ttlTracesV2ResourceColdStorage := ", toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + INTERVAL %v SECOND TO VOLUME '%s'"

	for _, distributedTableName := range tableNames {
		go func(distributedTableName string) {
			tableName := getLocalTableName(distributedTableName)

			// for trace summary table, we need to use end instead of timestamp
			timestamp := "timestamp"
			if strings.HasSuffix(distributedTableName, r.traceSummaryTable) {
				timestamp = "end"
			}

			ttl := types.TTLSetting{
				Identifiable: types.Identifiable{
					ID: valuer.GenerateUUID(),
				},
				TimeAuditable: types.TimeAuditable{
					CreatedAt: time.Now(),
					UpdatedAt: time.Now(),
				},
				TransactionID:  uuid,
				TableName:      tableName,
				TTL:            int(params.DelDuration),
				Status:         constants.StatusPending,
				ColdStorageTTL: coldStorageDuration,
				OrgID:          orgID,
			}
			_, dbErr := r.
				sqlDB.
				BunDB().
				NewInsert().
				Model(&ttl).
				Exec(ctx)
			if dbErr != nil {
				zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr))
				return
			}

			req := fmt.Sprintf(ttlV2, tableName, r.cluster, timestamp, params.DelDuration)
			if strings.HasSuffix(distributedTableName, r.traceResourceTableV3) {
				req = fmt.Sprintf(ttlV2Resource, tableName, r.cluster, params.DelDuration)
			}

			if len(params.ColdStorageVolume) > 0 {
				if strings.HasSuffix(distributedTableName, r.traceResourceTableV3) {
					req += fmt.Sprintf(ttlTracesV2ResourceColdStorage, params.ToColdStorageDuration, params.ColdStorageVolume)
				} else {
					req += fmt.Sprintf(ttlV2ColdStorage, timestamp, params.ToColdStorageDuration, params.ColdStorageVolume)
				}
			}
			err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
			if err != nil {
				zap.L().Error("Error in setting cold storage", zap.Error(err))
				statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
				if err == nil {
					_, dbErr := r.
						sqlDB.
						BunDB().
						NewUpdate().
						Model(new(types.TTLSetting)).
						Set("updated_at = ?", time.Now()).
						Set("status = ?", constants.StatusFailed).
						Where("id = ?", statusItem.ID.StringValue()).
						Exec(ctx)
					if dbErr != nil {
						zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
						return
					}
				}
				return
			}
			req += " SETTINGS materialize_ttl_after_modify=0;"
			zap.L().Error(" ExecutingTTL request: ", zap.String("request", req))
			statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName)
			if err := r.db.Exec(ctx, req); err != nil {
				zap.L().Error("Error in executing set TTL query", zap.Error(err))
				_, dbErr := r.
					sqlDB.
					BunDB().
					NewUpdate().
					Model(new(types.TTLSetting)).
					Set("updated_at = ?", time.Now()).
					Set("status = ?", constants.StatusFailed).
					Where("id = ?", statusItem.ID.StringValue()).
					Exec(ctx)
				if dbErr != nil {
					zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
					return
				}
				return
			}
			_, dbErr = r.
				sqlDB.
				BunDB().
				NewUpdate().
				Model(new(types.TTLSetting)).
				Set("updated_at = ?", time.Now()).
				Set("status = ?", constants.StatusSuccess).
				Where("id = ?", statusItem.ID.StringValue()).
				Exec(ctx)
			if dbErr != nil {
				zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
				return
			}
		}(distributedTableName)
	}
	return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
}

// SetTTL sets the TTL for traces or metrics or logs tables.
// This is an async API which creates goroutines to set TTL.
// Status of TTL update is tracked with ttl_status table in sqlite db.
func (r *ClickHouseReader) SetTTL(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
	// Keep only latest 100 transactions/requests
	r.deleteTtlTransactions(ctx, orgID, 100)
	// uuid is used as transaction id
	uuidWithHyphen := uuid.New()
	uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1)

	coldStorageDuration := -1
	if len(params.ColdStorageVolume) > 0 {
		coldStorageDuration = int(params.ToColdStorageDuration)
	}

	switch params.Type {
	case constants.TraceTTL:
		return r.setTTLTraces(ctx, orgID, params)
	case constants.MetricsTTL:
		tableNames := []string{
			signozMetricDBName + "." + signozSampleLocalTableName,
			signozMetricDBName + "." + signozSamplesAgg5mLocalTableName,
			signozMetricDBName + "." + signozSamplesAgg30mLocalTableName,
			signozMetricDBName + "." + signozExpHistLocalTableName,
			signozMetricDBName + "." + signozTSLocalTableNameV4,
			signozMetricDBName + "." + signozTSLocalTableNameV46Hrs,
			signozMetricDBName + "." + signozTSLocalTableNameV41Day,
			signozMetricDBName + "." + signozTSLocalTableNameV41Week,
		}
		for _, tableName := range tableNames {
			statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
			if err != nil {
				return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
			}
			if statusItem.Status == constants.StatusPending {
				return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
			}
		}
		metricTTL := func(tableName string) {
			ttl := types.TTLSetting{
				Identifiable: types.Identifiable{
					ID: valuer.GenerateUUID(),
				},
				TimeAuditable: types.TimeAuditable{
					CreatedAt: time.Now(),
					UpdatedAt: time.Now(),
				},
				TransactionID:  uuid,
				TableName:      tableName,
				TTL:            int(params.DelDuration),
				Status:         constants.StatusPending,
				ColdStorageTTL: coldStorageDuration,
				OrgID:          orgID,
			}
			_, dbErr := r.
				sqlDB.
				BunDB().
				NewInsert().
				Model(&ttl).
				Exec(ctx)
			if dbErr != nil {
				zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr))
				return
			}
			timeColumn := "timestamp_ms"
			if strings.Contains(tableName, "v4") || strings.Contains(tableName, "exp_hist") {
				timeColumn = "unix_milli"
			}

			req := fmt.Sprintf(
				"ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(toUInt32(%s / 1000), 'UTC') + "+
					"INTERVAL %v SECOND DELETE", tableName, r.cluster, timeColumn, params.DelDuration)
			if len(params.ColdStorageVolume) > 0 {
				req += fmt.Sprintf(", toDateTime(toUInt32(%s / 1000), 'UTC')"+
					" + INTERVAL %v SECOND TO VOLUME '%s'",
					timeColumn, params.ToColdStorageDuration, params.ColdStorageVolume)
			}
			err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
			if err != nil {
				zap.L().Error("Error in setting cold storage", zap.Error(err))
				statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
				if err == nil {
					_, dbErr := r.
						sqlDB.
						BunDB().
						NewUpdate().
						Model(new(types.TTLSetting)).
						Set("updated_at = ?", time.Now()).
						Set("status = ?", constants.StatusFailed).
						Where("id = ?", statusItem.ID.StringValue()).
						Exec(ctx)
					if dbErr != nil {
						zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
						return
					}
				}
				return
			}
			req += " SETTINGS materialize_ttl_after_modify=0"
			zap.L().Info("Executing TTL request: ", zap.String("request", req))
			statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName)
			if err := r.db.Exec(ctx, req); err != nil {
				zap.L().Error("error while setting ttl.", zap.Error(err))
				_, dbErr := r.
					sqlDB.
					BunDB().
					NewUpdate().
					Model(new(types.TTLSetting)).
					Set("updated_at = ?", time.Now()).
					Set("status = ?", constants.StatusFailed).
					Where("id = ?", statusItem.ID.StringValue()).
					Exec(ctx)
				if dbErr != nil {
					zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
					return
				}
				return
			}
			_, dbErr = r.
				sqlDB.
				BunDB().
				NewUpdate().
				Model(new(types.TTLSetting)).
				Set("updated_at = ?", time.Now()).
				Set("status = ?", constants.StatusSuccess).
				Where("id = ?", statusItem.ID.StringValue()).
				Exec(ctx)
			if dbErr != nil {
				zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
				return
			}
		}
		for _, tableName := range tableNames {
			go metricTTL(tableName)
		}
	case constants.LogsTTL:
		return r.setTTLLogs(ctx, orgID, params)

	default:
		return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while setting ttl. ttl type should be <metrics|traces>, got %v", params.Type)}
	}

	return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
}

func (r *ClickHouseReader) deleteTtlTransactions(ctx context.Context, orgID string, numberOfTransactionsStore int) {
	limitTransactions := []string{}
	err := r.
		sqlDB.
		BunDB().
		NewSelect().
		ColumnExpr("distinct(transaction_id)").
		Model(new(types.TTLSetting)).
		Where("org_id = ?", orgID).
		OrderExpr("created_at DESC").
		Limit(numberOfTransactionsStore).
		Scan(ctx, &limitTransactions)

	if err != nil {
		zap.L().Error("Error in processing ttl_status delete sql query", zap.Error(err))
	}

	_, err = r.
		sqlDB.
		BunDB().
		NewDelete().
		Model(new(types.TTLSetting)).
		Where("transaction_id NOT IN (?)", bun.In(limitTransactions)).
		Exec(ctx)
	if err != nil {
		zap.L().Error("Error in processing ttl_status delete sql query", zap.Error(err))
	}
}

// checkTTLStatusItem checks if ttl_status table has an entry for the given table name
func (r *ClickHouseReader) checkTTLStatusItem(ctx context.Context, orgID string, tableName string) (*types.TTLSetting, *model.ApiError) {
	zap.L().Info("checkTTLStatusItem query", zap.String("tableName", tableName))
	ttl := new(types.TTLSetting)
	err := r.
		sqlDB.
		BunDB().
		NewSelect().
		Model(ttl).
		Where("table_name = ?", tableName).
		Where("org_id = ?", orgID).
		OrderExpr("created_at DESC").
		Limit(1).
		Scan(ctx)
	if err != nil && err != sql.ErrNoRows {
		zap.L().Error("Error in processing sql query", zap.Error(err))
		return ttl, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
	}
	return ttl, nil
}

// setTTLQueryStatus fetches ttl_status table status from DB
func (r *ClickHouseReader) setTTLQueryStatus(ctx context.Context, orgID string, tableNameArray []string) (string, *model.ApiError) {
	failFlag := false
	status := constants.StatusSuccess
	for _, tableName := range tableNameArray {
		statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
		emptyStatusStruct := new(types.TTLSetting)
		if statusItem == emptyStatusStruct {
			return "", nil
		}
		if err != nil {
			return "", &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
		}
		if statusItem.Status == constants.StatusPending && statusItem.UpdatedAt.Unix()-time.Now().Unix() < 3600 {
			status = constants.StatusPending
			return status, nil
		}
		if statusItem.Status == constants.StatusFailed {
			failFlag = true
		}
	}
	if failFlag {
		status = constants.StatusFailed
	}

	return status, nil
}

func (r *ClickHouseReader) setColdStorage(ctx context.Context, tableName string, coldStorageVolume string) *model.ApiError {

	// Set the storage policy for the required table. If it is already set, then setting it again
	// will not a problem.
	if len(coldStorageVolume) > 0 {
		policyReq := fmt.Sprintf("ALTER TABLE %s ON CLUSTER %s MODIFY SETTING storage_policy='tiered'", tableName, r.cluster)

		zap.L().Info("Executing Storage policy request: ", zap.String("request", policyReq))
		if err := r.db.Exec(ctx, policyReq); err != nil {
			zap.L().Error("error while setting storage policy", zap.Error(err))
			return &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while setting storage policy. Err=%v", err)}
		}
	}
	return nil
}

// GetDisks returns a list of disks {name, type} configured in clickhouse DB.
func (r *ClickHouseReader) GetDisks(ctx context.Context) (*[]model.DiskItem, *model.ApiError) {
	diskItems := []model.DiskItem{}

	query := "SELECT name,type FROM system.disks"
	if err := r.db.Select(ctx, &diskItems, query); err != nil {
		zap.L().Error("Error in processing sql query", zap.Error(err))
		return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while getting disks. Err=%v", err)}
	}

	return &diskItems, nil
}

func getLocalTableNameArray(tableNames []string) []string {
	var localTableNames []string
	for _, name := range tableNames {
		tableNameSplit := strings.Split(name, ".")
		localTableNames = append(localTableNames, tableNameSplit[0]+"."+strings.Split(tableNameSplit[1], "distributed_")[1])
	}
	return localTableNames
}

// GetTTL returns current ttl, expected ttl and past setTTL status for metrics/traces.
func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) {

	parseTTL := func(queryResp string) (int, int) {

		zap.L().Info("Parsing TTL from: ", zap.String("queryResp", queryResp))
		deleteTTLExp := regexp.MustCompile(`toIntervalSecond\(([0-9]*)\)`)
		moveTTLExp := regexp.MustCompile(`toIntervalSecond\(([0-9]*)\) TO VOLUME`)

		var delTTL, moveTTL int = -1, -1

		m := deleteTTLExp.FindStringSubmatch(queryResp)
		if len(m) > 1 {
			seconds_int, err := strconv.Atoi(m[1])
			if err != nil {
				return -1, -1
			}
			delTTL = seconds_int / 3600
		}

		m = moveTTLExp.FindStringSubmatch(queryResp)
		if len(m) > 1 {
			seconds_int, err := strconv.Atoi(m[1])
			if err != nil {
				return -1, -1
			}
			moveTTL = seconds_int / 3600
		}

		return delTTL, moveTTL
	}

	getMetricsTTL := func() (*model.DBResponseTTL, *model.ApiError) {
		var dbResp []model.DBResponseTTL

		query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v'", signozSampleLocalTableName)

		err := r.db.Select(ctx, &dbResp, query)

		if err != nil {
			zap.L().Error("error while getting ttl", zap.Error(err))
			return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while getting ttl. Err=%v", err)}
		}
		if len(dbResp) == 0 {
			return nil, nil
		} else {
			return &dbResp[0], nil
		}
	}

	getTracesTTL := func() (*model.DBResponseTTL, *model.ApiError) {
		var dbResp []model.DBResponseTTL

		query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", r.traceLocalTableName, signozTraceDBName)

		err := r.db.Select(ctx, &dbResp, query)

		if err != nil {
			zap.L().Error("error while getting ttl", zap.Error(err))
			return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while getting ttl. Err=%v", err)}
		}
		if len(dbResp) == 0 {
			return nil, nil
		} else {
			return &dbResp[0], nil
		}
	}

	getLogsTTL := func() (*model.DBResponseTTL, *model.ApiError) {
		var dbResp []model.DBResponseTTL

		query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", r.logsLocalTableName, r.logsDB)

		err := r.db.Select(ctx, &dbResp, query)

		if err != nil {
			zap.L().Error("error while getting ttl", zap.Error(err))
			return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while getting ttl. Err=%v", err)}
		}
		if len(dbResp) == 0 {
			return nil, nil
		} else {
			return &dbResp[0], nil
		}
	}

	switch ttlParams.Type {
	case constants.TraceTTL:
		tableNameArray := []string{signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, signozTraceDBName + "." + signozSpansTable, signozTraceDBName + "." + signozErrorIndexTable, signozTraceDBName + "." + signozUsageExplorerTable, signozTraceDBName + "." + defaultDependencyGraphTable}

		tableNameArray = getLocalTableNameArray(tableNameArray)
		status, err := r.setTTLQueryStatus(ctx, orgID, tableNameArray)
		if err != nil {
			return nil, err
		}
		dbResp, err := getTracesTTL()
		if err != nil {
			return nil, err
		}
		ttlQuery, err := r.checkTTLStatusItem(ctx, orgID, tableNameArray[0])
		if err != nil {
			return nil, err
		}
		ttlQuery.TTL = ttlQuery.TTL / 3600 // convert to hours
		if ttlQuery.ColdStorageTTL != -1 {
			ttlQuery.ColdStorageTTL = ttlQuery.ColdStorageTTL / 3600 // convert to hours
		}

		delTTL, moveTTL := parseTTL(dbResp.EngineFull)
		return &model.GetTTLResponseItem{TracesTime: delTTL, TracesMoveTime: moveTTL, ExpectedTracesTime: ttlQuery.TTL, ExpectedTracesMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil

	case constants.MetricsTTL:
		tableNameArray := []string{signozMetricDBName + "." + signozSampleTableName}
		tableNameArray = getLocalTableNameArray(tableNameArray)
		status, err := r.setTTLQueryStatus(ctx, orgID, tableNameArray)
		if err != nil {
			return nil, err
		}
		dbResp, err := getMetricsTTL()
		if err != nil {
			return nil, err
		}
		ttlQuery, err := r.checkTTLStatusItem(ctx, orgID, tableNameArray[0])
		if err != nil {
			return nil, err
		}
		ttlQuery.TTL = ttlQuery.TTL / 3600 // convert to hours
		if ttlQuery.ColdStorageTTL != -1 {
			ttlQuery.ColdStorageTTL = ttlQuery.ColdStorageTTL / 3600 // convert to hours
		}

		delTTL, moveTTL := parseTTL(dbResp.EngineFull)
		return &model.GetTTLResponseItem{MetricsTime: delTTL, MetricsMoveTime: moveTTL, ExpectedMetricsTime: ttlQuery.TTL, ExpectedMetricsMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil

	case constants.LogsTTL:
		tableNameArray := []string{r.logsDB + "." + r.logsTableName}
		tableNameArray = getLocalTableNameArray(tableNameArray)
		status, err := r.setTTLQueryStatus(ctx, orgID, tableNameArray)
		if err != nil {
			return nil, err
		}
		dbResp, err := getLogsTTL()
		if err != nil {
			return nil, err
		}
		ttlQuery, err := r.checkTTLStatusItem(ctx, orgID, tableNameArray[0])
		if err != nil {
			return nil, err
		}
		ttlQuery.TTL = ttlQuery.TTL / 3600 // convert to hours
		if ttlQuery.ColdStorageTTL != -1 {
			ttlQuery.ColdStorageTTL = ttlQuery.ColdStorageTTL / 3600 // convert to hours
		}

		delTTL, moveTTL := parseTTL(dbResp.EngineFull)
		return &model.GetTTLResponseItem{LogsTime: delTTL, LogsMoveTime: moveTTL, ExpectedLogsTime: ttlQuery.TTL, ExpectedLogsMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil

	default:
		return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while getting ttl. ttl type should be metrics|traces, got %v",
			ttlParams.Type)}
	}

}

func (r *ClickHouseReader) ListErrors(ctx context.Context, queryParams *model.ListErrorsParams) (*[]model.Error, *model.ApiError) {

	var getErrorResponses []model.Error

	query := "SELECT any(exceptionMessage) as exceptionMessage, count() AS exceptionCount, min(timestamp) as firstSeen, max(timestamp) as lastSeen, groupID"
	if len(queryParams.ServiceName) != 0 {
		query = query + ", serviceName"
	} else {
		query = query + ", any(serviceName) as serviceName"
	}
	if len(queryParams.ExceptionType) != 0 {
		query = query + ", exceptionType"
	} else {
		query = query + ", any(exceptionType) as exceptionType"
	}
	query += fmt.Sprintf(" FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.errorTable)
	args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))}

	if len(queryParams.ServiceName) != 0 {
		query = query + " AND serviceName ilike @serviceName"
		args = append(args, clickhouse.Named("serviceName", "%"+queryParams.ServiceName+"%"))
	}
	if len(queryParams.ExceptionType) != 0 {
		query = query + " AND exceptionType ilike @exceptionType"
		args = append(args, clickhouse.Named("exceptionType", "%"+queryParams.ExceptionType+"%"))
	}

	// create TagQuery from TagQueryParams
	tags := createTagQueryFromTagQueryParams(queryParams.Tags)
	subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags)
	query += subQuery
	args = append(args, argsSubQuery...)

	if errStatus != nil {
		zap.L().Error("Error in processing tags", zap.Error(errStatus))
		return nil, errStatus
	}
	query = query + " GROUP BY groupID"
	if len(queryParams.ServiceName) != 0 {
		query = query + ", serviceName"
	}
	if len(queryParams.ExceptionType) != 0 {
		query = query + ", exceptionType"
	}
	if len(queryParams.OrderParam) != 0 {
		if queryParams.Order == constants.Descending {
			query = query + " ORDER BY " + queryParams.OrderParam + " DESC"
		} else if queryParams.Order == constants.Ascending {
			query = query + " ORDER BY " + queryParams.OrderParam + " ASC"
		}
	}
	if queryParams.Limit > 0 {
		query = query + " LIMIT @limit"
		args = append(args, clickhouse.Named("limit", queryParams.Limit))
	}

	if queryParams.Offset > 0 {
		query = query + " OFFSET @offset"
		args = append(args, clickhouse.Named("offset", queryParams.Offset))
	}

	err := r.db.Select(ctx, &getErrorResponses, query, args...)
	zap.L().Info(query)

	if err != nil {
		zap.L().Error("Error in processing sql query", zap.Error(err))
		return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
	}

	return &getErrorResponses, nil
}

func (r *ClickHouseReader) CountErrors(ctx context.Context, queryParams *model.CountErrorsParams) (uint64, *model.ApiError) {

	var errorCount uint64

	query := fmt.Sprintf("SELECT count(distinct(groupID)) FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.errorTable)
	args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))}
	if len(queryParams.ServiceName) != 0 {
		query = query + " AND serviceName ilike @serviceName"
		args = append(args, clickhouse.Named("serviceName", "%"+queryParams.ServiceName+"%"))
	}
	if len(queryParams.ExceptionType) != 0 {
		query = query + " AND exceptionType ilike @exceptionType"
		args = append(args, clickhouse.Named("exceptionType", "%"+queryParams.ExceptionType+"%"))
	}

	// create TagQuery from TagQueryParams
	tags := createTagQueryFromTagQueryParams(queryParams.Tags)
	subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags)
	query += subQuery
	args = append(args, argsSubQuery...)

	if errStatus != nil {
		zap.L().Error("Error in processing tags", zap.Error(errStatus))
		return 0, errStatus
	}

	err := r.db.QueryRow(ctx, query, args...).Scan(&errorCount)
	zap.L().Info(query)

	if err != nil {
		zap.L().Error("Error in processing sql query", zap.Error(err))
		return 0, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
	}

	return errorCount, nil
}

func (r *ClickHouseReader) GetErrorFromErrorID(ctx context.Context, queryParams *model.GetErrorParams) (*model.ErrorWithSpan, *model.ApiError) {

	if queryParams.ErrorID == "" {
		zap.L().Error("errorId missing from params")
		return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("ErrorID missing from params")}
	}
	var getErrorWithSpanReponse []model.ErrorWithSpan

	query := fmt.Sprintf("SELECT errorID, exceptionType, exceptionStacktrace, exceptionEscaped, exceptionMessage, timestamp, spanID, traceID, serviceName, groupID FROM %s.%s WHERE timestamp = @timestamp AND groupID = @groupID AND errorID = @errorID LIMIT 1", r.TraceDB, r.errorTable)
	args := []interface{}{clickhouse.Named("errorID", queryParams.ErrorID), clickhouse.Named("groupID", queryParams.GroupID), clickhouse.Named("timestamp", strconv.FormatInt(queryParams.Timestamp.UnixNano(), 10))}

	err := r.db.Select(ctx, &getErrorWithSpanReponse, query, args...)
	zap.L().Info(query)

	if err != nil {
		zap.L().Error("Error in processing sql query", zap.Error(err))
		return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
	}

	if len(getErrorWithSpanReponse) > 0 {
		return &getErrorWithSpanReponse[0], nil
	} else {
		return nil, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("Error/Exception not found")}
	}

}

func (r *ClickHouseReader) GetErrorFromGroupID(ctx context.Context, queryParams *model.GetErrorParams) (*model.ErrorWithSpan, *model.ApiError) {

	var getErrorWithSpanReponse []model.ErrorWithSpan

	query := fmt.Sprintf("SELECT errorID, exceptionType, exceptionStacktrace, exceptionEscaped, exceptionMessage, timestamp, spanID, traceID, serviceName, groupID FROM %s.%s WHERE timestamp = @timestamp AND groupID = @groupID LIMIT 1", r.TraceDB, r.errorTable)
	args := []interface{}{clickhouse.Named("groupID", queryParams.GroupID), clickhouse.Named("timestamp", strconv.FormatInt(queryParams.Timestamp.UnixNano(), 10))}

	err := r.db.Select(ctx, &getErrorWithSpanReponse, query, args...)

	zap.L().Info(query)

	if err != nil {
		zap.L().Error("Error in processing sql query", zap.Error(err))
		return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
	}

	if len(getErrorWithSpanReponse) > 0 {
		return &getErrorWithSpanReponse[0], nil
	} else {
		return nil, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("Error/Exception not found")}
	}

}

func (r *ClickHouseReader) GetNextPrevErrorIDs(ctx context.Context, queryParams *model.GetErrorParams) (*model.NextPrevErrorIDs, *model.ApiError) {

	if queryParams.ErrorID == "" {
		zap.L().Error("errorId missing from params")
		return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("ErrorID missing from params")}
	}
	var err *model.ApiError
	getNextPrevErrorIDsResponse := model.NextPrevErrorIDs{
		GroupID: queryParams.GroupID,
	}
	getNextPrevErrorIDsResponse.NextErrorID, getNextPrevErrorIDsResponse.NextTimestamp, err = r.getNextErrorID(ctx, queryParams)
	if err != nil {
		zap.L().Error("Unable to get next error ID due to err: ", zap.Error(err))
		return nil, err
	}
	getNextPrevErrorIDsResponse.PrevErrorID, getNextPrevErrorIDsResponse.PrevTimestamp, err = r.getPrevErrorID(ctx, queryParams)
	if err != nil {
		zap.L().Error("Unable to get prev error ID due to err: ", zap.Error(err))
		return nil, err
	}
	return &getNextPrevErrorIDsResponse, nil

}

func (r *ClickHouseReader) getNextErrorID(ctx context.Context, queryParams *model.GetErrorParams) (string, time.Time, *model.ApiError) {

	var getNextErrorIDReponse []model.NextPrevErrorIDsDBResponse

	query := fmt.Sprintf("SELECT errorID as nextErrorID, timestamp as nextTimestamp FROM %s.%s WHERE groupID = @groupID AND timestamp >= @timestamp AND errorID != @errorID ORDER BY timestamp ASC LIMIT 2", r.TraceDB, r.errorTable)
	args := []interface{}{clickhouse.Named("errorID", queryParams.ErrorID), clickhouse.Named("groupID", queryParams.GroupID), clickhouse.Named("timestamp", strconv.FormatInt(queryParams.Timestamp.UnixNano(), 10))}

	err := r.db.Select(ctx, &getNextErrorIDReponse, query, args...)

	zap.L().Info(query)

	if err != nil {
		zap.L().Error("Error in processing sql query", zap.Error(err))
		return "", time.Time{}, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
	}
	if len(getNextErrorIDReponse) == 0 {
		zap.L().Info("NextErrorID not found")
		return "", time.Time{}, nil
	} else if len(getNextErrorIDReponse) == 1 {
		zap.L().Info("NextErrorID found")
		return getNextErrorIDReponse[0].NextErrorID, getNextErrorIDReponse[0].NextTimestamp, nil
	} else {
		if getNextErrorIDReponse[0].Timestamp.UnixNano() == getNextErrorIDReponse[1].Timestamp.UnixNano() {
			var getNextErrorIDReponse []model.NextPrevErrorIDsDBResponse

			query := fmt.Sprintf("SELECT errorID as nextErrorID, timestamp as nextTimestamp FROM %s.%s WHERE groupID = @groupID AND timestamp = @timestamp AND errorID > @errorID ORDER BY errorID ASC LIMIT 1", r.TraceDB, r.errorTable)
			args := []interface{}{clickhouse.Named("errorID", queryParams.ErrorID), clickhouse.Named("groupID", queryParams.GroupID), clickhouse.Named("timestamp", strconv.FormatInt(queryParams.Timestamp.UnixNano(), 10))}

			err := r.db.Select(ctx, &getNextErrorIDReponse, query, args...)

			zap.L().Info(query)

			if err != nil {
				zap.L().Error("Error in processing sql query", zap.Error(err))
				return "", time.Time{}, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
			}
			if len(getNextErrorIDReponse) == 0 {
				var getNextErrorIDReponse []model.NextPrevErrorIDsDBResponse

				query := fmt.Sprintf("SELECT errorID as nextErrorID, timestamp as nextTimestamp FROM %s.%s WHERE groupID = @groupID AND timestamp > @timestamp ORDER BY timestamp ASC LIMIT 1", r.TraceDB, r.errorTable)
				args := []interface{}{clickhouse.Named("errorID", queryParams.ErrorID), clickhouse.Named("groupID", queryParams.GroupID), clickhouse.Named("timestamp", strconv.FormatInt(queryParams.Timestamp.UnixNano(), 10))}

				err := r.db.Select(ctx, &getNextErrorIDReponse, query, args...)

				zap.L().Info(query)

				if err != nil {
					zap.L().Error("Error in processing sql query", zap.Error(err))
					return "", time.Time{}, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
				}

				if len(getNextErrorIDReponse) == 0 {
					zap.L().Info("NextErrorID not found")
					return "", time.Time{}, nil
				} else {
					zap.L().Info("NextErrorID found")
					return getNextErrorIDReponse[0].NextErrorID, getNextErrorIDReponse[0].NextTimestamp, nil
				}
			} else {
				zap.L().Info("NextErrorID found")
				return getNextErrorIDReponse[0].NextErrorID, getNextErrorIDReponse[0].NextTimestamp, nil
			}
		} else {
			zap.L().Info("NextErrorID found")
			return getNextErrorIDReponse[0].NextErrorID, getNextErrorIDReponse[0].NextTimestamp, nil
		}
	}
}

func (r *ClickHouseReader) getPrevErrorID(ctx context.Context, queryParams *model.GetErrorParams) (string, time.Time, *model.ApiError) {

	var getPrevErrorIDReponse []model.NextPrevErrorIDsDBResponse

	query := fmt.Sprintf("SELECT errorID as prevErrorID, timestamp as prevTimestamp FROM %s.%s WHERE groupID = @groupID AND timestamp <= @timestamp AND errorID != @errorID ORDER BY timestamp DESC LIMIT 2", r.TraceDB, r.errorTable)
	args := []interface{}{clickhouse.Named("errorID", queryParams.ErrorID), clickhouse.Named("groupID", queryParams.GroupID), clickhouse.Named("timestamp", strconv.FormatInt(queryParams.Timestamp.UnixNano(), 10))}

	err := r.db.Select(ctx, &getPrevErrorIDReponse, query, args...)

	zap.L().Info(query)

	if err != nil {
		zap.L().Error("Error in processing sql query", zap.Error(err))
		return "", time.Time{}, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
	}
	if len(getPrevErrorIDReponse) == 0 {
		zap.L().Info("PrevErrorID not found")
		return "", time.Time{}, nil
	} else if len(getPrevErrorIDReponse) == 1 {
		zap.L().Info("PrevErrorID found")
		return getPrevErrorIDReponse[0].PrevErrorID, getPrevErrorIDReponse[0].PrevTimestamp, nil
	} else {
		if getPrevErrorIDReponse[0].Timestamp.UnixNano() == getPrevErrorIDReponse[1].Timestamp.UnixNano() {
			var getPrevErrorIDReponse []model.NextPrevErrorIDsDBResponse

			query := fmt.Sprintf("SELECT errorID as prevErrorID, timestamp as prevTimestamp FROM %s.%s WHERE groupID = @groupID AND timestamp = @timestamp AND errorID < @errorID ORDER BY errorID DESC LIMIT 1", r.TraceDB, r.errorTable)
			args := []interface{}{clickhouse.Named("errorID", queryParams.ErrorID), clickhouse.Named("groupID", queryParams.GroupID), clickhouse.Named("timestamp", strconv.FormatInt(queryParams.Timestamp.UnixNano(), 10))}

			err := r.db.Select(ctx, &getPrevErrorIDReponse, query, args...)

			zap.L().Info(query)

			if err != nil {
				zap.L().Error("Error in processing sql query", zap.Error(err))
				return "", time.Time{}, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
			}
			if len(getPrevErrorIDReponse) == 0 {
				var getPrevErrorIDReponse []model.NextPrevErrorIDsDBResponse

				query := fmt.Sprintf("SELECT errorID as prevErrorID, timestamp as prevTimestamp FROM %s.%s WHERE groupID = @groupID AND timestamp < @timestamp ORDER BY timestamp DESC LIMIT 1", r.TraceDB, r.errorTable)
				args := []interface{}{clickhouse.Named("errorID", queryParams.ErrorID), clickhouse.Named("groupID", queryParams.GroupID), clickhouse.Named("timestamp", strconv.FormatInt(queryParams.Timestamp.UnixNano(), 10))}

				err := r.db.Select(ctx, &getPrevErrorIDReponse, query, args...)

				zap.L().Info(query)

				if err != nil {
					zap.L().Error("Error in processing sql query", zap.Error(err))
					return "", time.Time{}, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
				}

				if len(getPrevErrorIDReponse) == 0 {
					zap.L().Info("PrevErrorID not found")
					return "", time.Time{}, nil
				} else {
					zap.L().Info("PrevErrorID found")
					return getPrevErrorIDReponse[0].PrevErrorID, getPrevErrorIDReponse[0].PrevTimestamp, nil
				}
			} else {
				zap.L().Info("PrevErrorID found")
				return getPrevErrorIDReponse[0].PrevErrorID, getPrevErrorIDReponse[0].PrevTimestamp, nil
			}
		} else {
			zap.L().Info("PrevErrorID found")
			return getPrevErrorIDReponse[0].PrevErrorID, getPrevErrorIDReponse[0].PrevTimestamp, nil
		}
	}
}

func (r *ClickHouseReader) GetTotalSpans(ctx context.Context) (uint64, error) {

	var totalSpans uint64

	queryStr := fmt.Sprintf("SELECT count() from %s.%s;", signozTraceDBName, r.traceTableName)
	r.db.QueryRow(ctx, queryStr).Scan(&totalSpans)

	return totalSpans, nil
}

func (r *ClickHouseReader) GetSpansInLastHeartBeatInterval(ctx context.Context, interval time.Duration) (uint64, error) {
	var spansInLastHeartBeatInterval uint64
	r.db.QueryRow(ctx, fmt.Sprintf("SELECT count() from %s.%s where ts_bucket_start >= toUInt64(toUnixTimestamp(now() - toIntervalMinute(%d))) - 1800 and timestamp > toUnixTimestamp(now()-toIntervalMinute(%d));", signozTraceDBName, r.traceTableName, int(interval.Minutes()), int(interval.Minutes()))).Scan(&spansInLastHeartBeatInterval)

	return spansInLastHeartBeatInterval, nil
}

func (r *ClickHouseReader) GetTotalLogs(ctx context.Context) (uint64, error) {

	var totalLogs uint64

	queryStr := fmt.Sprintf("SELECT count() from %s.%s;", r.logsDB, r.logsTableName)
	r.db.QueryRow(ctx, queryStr).Scan(&totalLogs)

	return totalLogs, nil
}

func (r *ClickHouseReader) FetchTemporality(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]map[v3.Temporality]bool, error) {
	metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
	var metricNamesToQuery []string
	for _, metricName := range metricNames {
		updatedMetadata, cacheErr := r.GetUpdatedMetricsMetadata(ctx, orgID, metricName)
		if cacheErr != nil {
			zap.L().Info("Error in getting metrics cached metadata", zap.Error(cacheErr))
		}
		if metadata, exist := updatedMetadata[metricName]; exist {
			if _, exists := metricNameToTemporality[metricName]; !exists {
				metricNameToTemporality[metricName] = make(map[v3.Temporality]bool)
			}
			metricNameToTemporality[metricName][metadata.Temporality] = true
		} else {
			metricNamesToQuery = append(metricNamesToQuery, metricName)
		}
	}

	query := fmt.Sprintf(`SELECT DISTINCT metric_name, temporality FROM %s.%s WHERE metric_name IN $1`, signozMetricDBName, signozTSTableNameV41Day)

	rows, err := r.db.Query(ctx, query, metricNames)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	for rows.Next() {
		var metricName, temporality string
		err := rows.Scan(&metricName, &temporality)
		if err != nil {
			return nil, err
		}
		if _, ok := metricNameToTemporality[metricName]; !ok {
			metricNameToTemporality[metricName] = make(map[v3.Temporality]bool)
		}
		metricNameToTemporality[metricName][v3.Temporality(temporality)] = true
	}
	return metricNameToTemporality, nil
}

func (r *ClickHouseReader) GetTimeSeriesInfo(ctx context.Context) (map[string]interface{}, error) {

	queryStr := fmt.Sprintf("SELECT countDistinct(fingerprint) as count from %s.%s where metric_name not like 'signoz_%%' group by metric_name order by count desc;", signozMetricDBName, signozTSTableNameV41Day)

	rows, _ := r.db.Query(ctx, queryStr)

	var totalTS uint64
	totalTS = 0

	var maxTS uint64
	maxTS = 0

	count := 0
	for rows.Next() {

		var value uint64
		rows.Scan(&value)
		totalTS += value
		if count == 0 {
			maxTS = value
		}
		count += 1
	}

	timeSeriesData := map[string]interface{}{}
	timeSeriesData["totalTS"] = totalTS
	timeSeriesData["maxTS"] = maxTS

	return timeSeriesData, nil
}

func (r *ClickHouseReader) GetSamplesInfoInLastHeartBeatInterval(ctx context.Context, interval time.Duration) (uint64, error) {

	var totalSamples uint64

	queryStr := fmt.Sprintf("select count() from %s.%s where metric_name not like 'signoz_%%' and unix_milli > toUnixTimestamp(now()-toIntervalMinute(%d))*1000;", signozMetricDBName, signozSampleTableName, int(interval.Minutes()))

	r.db.QueryRow(ctx, queryStr).Scan(&totalSamples)

	return totalSamples, nil
}

func (r *ClickHouseReader) GetTotalSamples(ctx context.Context) (uint64, error) {
	var totalSamples uint64

	queryStr := fmt.Sprintf("select count() from %s.%s where metric_name not like 'signoz_%%';", signozMetricDBName, signozSampleTableName)

	r.db.QueryRow(ctx, queryStr).Scan(&totalSamples)

	return totalSamples, nil
}

func (r *ClickHouseReader) GetDistributedInfoInLastHeartBeatInterval(ctx context.Context) (map[string]interface{}, error) {

	clusterInfo := []model.ClusterInfo{}

	queryStr := `SELECT shard_num, shard_weight, replica_num, errors_count, slowdowns_count, estimated_recovery_time FROM system.clusters where cluster='cluster';`
	r.db.Select(ctx, &clusterInfo, queryStr)
	if len(clusterInfo) == 1 {
		return clusterInfo[0].GetMapFromStruct(), nil
	}

	return nil, nil
}

func (r *ClickHouseReader) GetLogsInfoInLastHeartBeatInterval(ctx context.Context, interval time.Duration) (uint64, error) {

	var totalLogLines uint64

	queryStr := fmt.Sprintf("select count() from %s.%s where timestamp > toUnixTimestamp(now()-toIntervalMinute(%d))*1000000000;", r.logsDB, r.logsTableV2, int(interval.Minutes()))

	err := r.db.QueryRow(ctx, queryStr).Scan(&totalLogLines)

	return totalLogLines, err
}

func (r *ClickHouseReader) GetTagsInfoInLastHeartBeatInterval(ctx context.Context, interval time.Duration) (*model.TagsInfo, error) {
	queryStr := fmt.Sprintf(`select serviceName, resources_string['deployment.environment'] as env, 
	resources_string['telemetry.sdk.language'] as language from %s.%s 
	where timestamp > toUnixTimestamp(now()-toIntervalMinute(%d))
	group by serviceName, env, language;`, r.TraceDB, r.traceTableName, int(interval.Minutes()))

	tagTelemetryDataList := []model.TagTelemetryData{}
	err := r.db.Select(ctx, &tagTelemetryDataList, queryStr)

	if err != nil {
		zap.L().Error("Error in processing sql query: ", zap.Error(err))
		return nil, err
	}

	tagsInfo := model.TagsInfo{
		Languages: make(map[string]interface{}),
		Services:  make(map[string]interface{}),
	}

	for _, tagTelemetryData := range tagTelemetryDataList {

		if len(tagTelemetryData.ServiceName) != 0 && strings.Contains(tagTelemetryData.ServiceName, "prod") {
			tagsInfo.Env = tagTelemetryData.ServiceName
		}
		if len(tagTelemetryData.Env) != 0 && strings.Contains(tagTelemetryData.Env, "prod") {
			tagsInfo.Env = tagTelemetryData.Env
		}
		if len(tagTelemetryData.Language) != 0 {
			tagsInfo.Languages[tagTelemetryData.Language] = struct{}{}
		}
		if len(tagTelemetryData.ServiceName) != 0 {
			tagsInfo.Services[tagTelemetryData.ServiceName] = struct{}{}
		}

	}

	return &tagsInfo, nil
}

// remove this after sometime
func removeUnderscoreDuplicateFields(fields []model.Field) []model.Field {
	lookup := map[string]model.Field{}
	for _, v := range fields {
		lookup[v.Name+v.DataType] = v
	}

	for k := range lookup {
		if strings.Contains(k, ".") {
			delete(lookup, strings.ReplaceAll(k, ".", "_"))
		}
	}

	updatedFields := []model.Field{}
	for _, v := range lookup {
		updatedFields = append(updatedFields, v)
	}
	return updatedFields
}

func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError) {
	// response will contain top level fields from the otel log model
	response := model.GetFieldsResponse{
		Selected:    constants.StaticSelectedLogFields,
		Interesting: []model.Field{},
	}

	// get attribute keys
	attributes := []model.Field{}
	query := fmt.Sprintf("SELECT DISTINCT name, datatype from %s.%s group by name, datatype", r.logsDB, r.logsAttributeKeys)
	err := r.db.Select(ctx, &attributes, query)
	if err != nil {
		return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal}
	}

	// get resource keys
	resources := []model.Field{}
	query = fmt.Sprintf("SELECT DISTINCT name, datatype from %s.%s group by name, datatype", r.logsDB, r.logsResourceKeys)
	err = r.db.Select(ctx, &resources, query)
	if err != nil {
		return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal}
	}

	//remove this code after sometime
	attributes = removeUnderscoreDuplicateFields(attributes)
	resources = removeUnderscoreDuplicateFields(resources)

	statements := []model.ShowCreateTableStatement{}
	query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTableName)
	err = r.db.Select(ctx, &statements, query)
	if err != nil {
		return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal}
	}

	r.extractSelectedAndInterestingFields(statements[0].Statement, constants.Attributes, &attributes, &response)
	r.extractSelectedAndInterestingFields(statements[0].Statement, constants.Resources, &resources, &response)

	return &response, nil
}

func (r *ClickHouseReader) extractSelectedAndInterestingFields(tableStatement string, overrideFieldType string, fields *[]model.Field, response *model.GetFieldsResponse) {
	for _, field := range *fields {
		if overrideFieldType != "" {
			field.Type = overrideFieldType
		}
		// all static fields are assumed to be selected as we don't allow changing them
		if isColumn(tableStatement, field.Type, field.Name, field.DataType) {
			response.Selected = append(response.Selected, field)
		} else {
			response.Interesting = append(response.Interesting, field)
		}
	}
}

func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError {
	if !field.Selected {
		return model.ForbiddenError(errors.New("removing a selected field is not allowed, please reach out to support."))
	}

	colname := utils.GetClickhouseColumnNameV2(field.Type, field.DataType, field.Name)

	field.DataType = strings.ToLower(field.DataType)
	dataType := constants.MaterializedDataTypeMap[field.DataType]
	chDataType := constants.ChDataTypeMap[field.DataType]

	attrColName := fmt.Sprintf("%s_%s", field.Type, dataType)
	for _, table := range []string{r.logsLocalTableV2, r.logsTableV2} {
		q := "ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS `%s` %s DEFAULT %s['%s'] CODEC(ZSTD(1))"
		query := fmt.Sprintf(q,
			r.logsDB, table,
			r.cluster,
			colname, chDataType,
			attrColName,
			field.Name,
		)
		err := r.db.Exec(ctx, query)
		if err != nil {
			return &model.ApiError{Err: err, Typ: model.ErrorInternal}
		}

		query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS `%s_exists` bool DEFAULT if(mapContains(%s, '%s') != 0, true, false) CODEC(ZSTD(1))",
			r.logsDB, table,
			r.cluster,
			colname,
			attrColName,
			field.Name,
		)
		err = r.db.Exec(ctx, query)
		if err != nil {
			return &model.ApiError{Err: err, Typ: model.ErrorInternal}
		}
	}

	// create the index
	if strings.ToLower(field.DataType) == "bool" {
		// there is no point in creating index for bool attributes as the cardinality is just 2
		return nil
	}

	if field.IndexType == "" {
		field.IndexType = constants.DefaultLogSkipIndexType
	}
	if field.IndexGranularity == 0 {
		field.IndexGranularity = constants.DefaultLogSkipIndexGranularity
	}
	query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS `%s_idx` (`%s`) TYPE %s  GRANULARITY %d",
		r.logsDB, r.logsLocalTableV2,
		r.cluster,
		colname,
		colname,
		field.IndexType,
		field.IndexGranularity,
	)
	err := r.db.Exec(ctx, query)
	if err != nil {
		return &model.ApiError{Err: err, Typ: model.ErrorInternal}
	}
	return nil
}

func (r *ClickHouseReader) GetTraceFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError) {
	// response will contain top level fields from the otel trace model
	response := model.GetFieldsResponse{
		Selected:    []model.Field{},
		Interesting: []model.Field{},
	}

	// get the top level selected fields
	for _, field := range constants.NewStaticFieldsTraces {
		if (v3.AttributeKey{} == field) {
			continue
		}
		response.Selected = append(response.Selected, model.Field{
			Name:     field.Key,
			DataType: field.DataType.String(),
			Type:     constants.Static,
		})
	}

	// get attribute keys
	attributes := []model.Field{}
	query := fmt.Sprintf("SELECT tagKey, tagType, dataType from %s.%s group by tagKey, tagType, dataType", r.TraceDB, r.spanAttributesKeysTable)
	rows, err := r.db.Query(ctx, query)
	if err != nil {
		return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal}
	}
	defer rows.Close()

	var tagKey string
	var dataType string
	var tagType string
	for rows.Next() {
		if err := rows.Scan(&tagKey, &tagType, &dataType); err != nil {
			return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal}
		}
		attributes = append(attributes, model.Field{
			Name:     tagKey,
			DataType: dataType,
			Type:     tagType,
		})
	}

	statements := []model.ShowCreateTableStatement{}
	query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.TraceDB, r.traceLocalTableName)
	err = r.db.Select(ctx, &statements, query)
	if err != nil {
		return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal}
	}

	r.extractSelectedAndInterestingFields(statements[0].Statement, "", &attributes, &response)

	return &response, nil

}

func (r *ClickHouseReader) UpdateTraceField(ctx context.Context, field *model.UpdateField) *model.ApiError {
	if !field.Selected {
		return model.ForbiddenError(errors.New("removing a selected field is not allowed, please reach out to support."))
	}

	// name of the materialized column
	colname := utils.GetClickhouseColumnNameV2(field.Type, field.DataType, field.Name)

	field.DataType = strings.ToLower(field.DataType)

	// dataType and chDataType of the materialized column
	chDataType := constants.ChDataTypeMap[field.DataType]
	dataType := constants.MaterializedDataTypeMap[field.DataType]

	// typeName: tag => attributes, resource => resources
	typeName := field.Type
	if field.Type == string(v3.AttributeKeyTypeTag) {
		typeName = constants.Attributes
	} else if field.Type == string(v3.AttributeKeyTypeResource) {
		typeName = constants.Resources
	}

	attrColName := fmt.Sprintf("%s_%s", typeName, dataType)
	for _, table := range []string{r.traceLocalTableName, r.traceTableName} {
		q := "ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS `%s` %s DEFAULT %s['%s'] CODEC(ZSTD(1))"
		query := fmt.Sprintf(q,
			r.TraceDB, table,
			r.cluster,
			colname, chDataType,
			attrColName,
			field.Name,
		)
		err := r.db.Exec(ctx, query)
		if err != nil {
			return &model.ApiError{Err: err, Typ: model.ErrorInternal}
		}

		query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS `%s_exists` bool DEFAULT if(mapContains(%s, '%s') != 0, true, false) CODEC(ZSTD(1))",
			r.TraceDB, table,
			r.cluster,
			colname,
			attrColName,
			field.Name,
		)
		err = r.db.Exec(ctx, query)
		if err != nil {
			return &model.ApiError{Err: err, Typ: model.ErrorInternal}
		}
	}

	// create the index
	if strings.ToLower(field.DataType) == "bool" {
		// there is no point in creating index for bool attributes as the cardinality is just 2
		return nil
	}

	if field.IndexType == "" {
		field.IndexType = constants.DefaultLogSkipIndexType
	}
	if field.IndexGranularity == 0 {
		field.IndexGranularity = constants.DefaultLogSkipIndexGranularity
	}
	query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS `%s_idx` (`%s`) TYPE %s  GRANULARITY %d",
		r.TraceDB, r.traceLocalTableName,
		r.cluster,
		colname,
		colname,
		field.IndexType,
		field.IndexGranularity,
	)
	err := r.db.Exec(ctx, query)
	if err != nil {
		return &model.ApiError{Err: err, Typ: model.ErrorInternal}
	}

	// add a default minmax index for numbers
	if dataType == "number" {
		query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS `%s_minmax_idx` (`%s`) TYPE minmax  GRANULARITY 1",
			r.TraceDB, r.traceLocalTableName,
			r.cluster,
			colname,
			colname,
		)
		err = r.db.Exec(ctx, query)
		if err != nil {
			return &model.ApiError{Err: err, Typ: model.ErrorInternal}
		}
	}

	return nil
}

func (r *ClickHouseReader) GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.SignozLog, *model.ApiError) {
	response := []model.SignozLog{}
	fields, apiErr := r.GetLogFields(ctx)
	if apiErr != nil {
		return nil, apiErr
	}

	isPaginatePrev := logs.CheckIfPrevousPaginateAndModifyOrder(params)
	filterSql, lenFilters, err := logs.GenerateSQLWhere(fields, params)
	if err != nil {
		return nil, &model.ApiError{Err: err, Typ: model.ErrorBadData}
	}

	data := map[string]interface{}{
		"lenFilters": lenFilters,
	}
	if lenFilters != 0 {
		claims, errv2 := authtypes.ClaimsFromContext(ctx)
		if errv2 == nil {
			telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_LOGS_FILTERS, data, claims.Email, true, false)
		}
	}

	query := fmt.Sprintf("%s from %s.%s", constants.LogsSQLSelect, r.logsDB, r.logsTable)

	if filterSql != "" {
		query = fmt.Sprintf("%s where %s", query, filterSql)
	}

	query = fmt.Sprintf("%s order by %s %s limit %d", query, params.OrderBy, params.Order, params.Limit)
	err = r.db.Select(ctx, &response, query)
	if err != nil {
		return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal}
	}
	if isPaginatePrev {
		// rever the results from db
		for i, j := 0, len(response)-1; i < j; i, j = i+1, j-1 {
			response[i], response[j] = response[j], response[i]
		}
	}
	return &response, nil
}

func (r *ClickHouseReader) TailLogs(ctx context.Context, client *model.LogsTailClient) {

	fields, apiErr := r.GetLogFields(ctx)
	if apiErr != nil {
		client.Error <- apiErr.Err
		return
	}

	filterSql, lenFilters, err := logs.GenerateSQLWhere(fields, &model.LogsFilterParams{
		Query: client.Filter.Query,
	})

	data := map[string]interface{}{
		"lenFilters": lenFilters,
	}
	if lenFilters != 0 {
		claims, errv2 := authtypes.ClaimsFromContext(ctx)
		if errv2 == nil {
			telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_LOGS_FILTERS, data, claims.Email, true, false)
		}
	}

	if err != nil {
		client.Error <- err
		return
	}

	query := fmt.Sprintf("%s from %s.%s", constants.LogsSQLSelect, r.logsDB, r.logsTable)

	tsStart := uint64(time.Now().UnixNano())
	if client.Filter.TimestampStart != 0 {
		tsStart = client.Filter.TimestampStart
	}

	var idStart string
	if client.Filter.IdGt != "" {
		idStart = client.Filter.IdGt
	}

	ticker := time.NewTicker(time.Duration(r.liveTailRefreshSeconds) * time.Second)
	defer ticker.Stop()
	for {
		select {
		case <-ctx.Done():
			done := true
			client.Done <- &done
			zap.L().Debug("closing go routine : " + client.Name)
			return
		case <-ticker.C:
			// get the new 100 logs as anything more older won't make sense
			tmpQuery := fmt.Sprintf("%s where timestamp >='%d'", query, tsStart)
			if filterSql != "" {
				tmpQuery = fmt.Sprintf("%s and %s", tmpQuery, filterSql)
			}
			if idStart != "" {
				tmpQuery = fmt.Sprintf("%s and id > '%s'", tmpQuery, idStart)
			}
			tmpQuery = fmt.Sprintf("%s order by timestamp desc, id desc limit 100", tmpQuery)
			response := []model.SignozLog{}
			err := r.db.Select(ctx, &response, tmpQuery)
			if err != nil {
				zap.L().Error("Error while getting logs", zap.Error(err))
				client.Error <- err
				return
			}
			for i := len(response) - 1; i >= 0; i-- {
				select {
				case <-ctx.Done():
					done := true
					client.Done <- &done
					zap.L().Debug("closing go routine while sending logs : " + client.Name)
					return
				default:
					client.Logs <- &response[i]
					if i == 0 {
						tsStart = response[i].Timestamp
						idStart = response[i].ID
					}
				}
			}
		}
	}
}

func (r *ClickHouseReader) AggregateLogs(ctx context.Context, params *model.LogsAggregateParams) (*model.GetLogsAggregatesResponse, *model.ApiError) {
	logAggregatesDBResponseItems := []model.LogsAggregatesDBResponseItem{}

	function := "toFloat64(count()) as value"
	if params.Function != "" {
		function = fmt.Sprintf("toFloat64(%s) as value", params.Function)
	}

	fields, apiErr := r.GetLogFields(ctx)
	if apiErr != nil {
		return nil, apiErr
	}

	filterSql, lenFilters, err := logs.GenerateSQLWhere(fields, &model.LogsFilterParams{
		Query: params.Query,
	})
	if err != nil {
		return nil, &model.ApiError{Err: err, Typ: model.ErrorBadData}
	}

	data := map[string]interface{}{
		"lenFilters": lenFilters,
	}
	if lenFilters != 0 {
		claims, errv2 := authtypes.ClaimsFromContext(ctx)
		if errv2 == nil {
			telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_LOGS_FILTERS, data, claims.Email, true, false)
		}
	}

	query := ""
	if params.GroupBy != "" {
		query = fmt.Sprintf("SELECT toInt64(toUnixTimestamp(toStartOfInterval(toDateTime(timestamp/1000000000), INTERVAL %d minute))*1000000000) as ts_start_interval, toString(%s) as groupBy, "+
			"%s "+
			"FROM %s.%s WHERE (timestamp >= '%d' AND timestamp <= '%d' )",
			params.StepSeconds/60, params.GroupBy, function, r.logsDB, r.logsTable, params.TimestampStart, params.TimestampEnd)
	} else {
		query = fmt.Sprintf("SELECT toInt64(toUnixTimestamp(toStartOfInterval(toDateTime(timestamp/1000000000), INTERVAL %d minute))*1000000000) as ts_start_interval, "+
			"%s "+
			"FROM %s.%s WHERE (timestamp >= '%d' AND timestamp <= '%d' )",
			params.StepSeconds/60, function, r.logsDB, r.logsTable, params.TimestampStart, params.TimestampEnd)
	}
	if filterSql != "" {
		query = fmt.Sprintf("%s AND ( %s ) ", query, filterSql)
	}
	if params.GroupBy != "" {
		query = fmt.Sprintf("%s GROUP BY ts_start_interval, toString(%s) as groupBy ORDER BY ts_start_interval", query, params.GroupBy)
	} else {
		query = fmt.Sprintf("%s GROUP BY ts_start_interval ORDER BY ts_start_interval", query)
	}

	err = r.db.Select(ctx, &logAggregatesDBResponseItems, query)
	if err != nil {
		return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal}
	}

	aggregateResponse := model.GetLogsAggregatesResponse{
		Items: make(map[int64]model.LogsAggregatesResponseItem),
	}

	for i := range logAggregatesDBResponseItems {
		if elem, ok := aggregateResponse.Items[int64(logAggregatesDBResponseItems[i].Timestamp)]; ok {
			if params.GroupBy != "" && logAggregatesDBResponseItems[i].GroupBy != "" {
				elem.GroupBy[logAggregatesDBResponseItems[i].GroupBy] = logAggregatesDBResponseItems[i].Value
			}
			aggregateResponse.Items[logAggregatesDBResponseItems[i].Timestamp] = elem
		} else {
			if params.GroupBy != "" && logAggregatesDBResponseItems[i].GroupBy != "" {
				aggregateResponse.Items[logAggregatesDBResponseItems[i].Timestamp] = model.LogsAggregatesResponseItem{
					Timestamp: logAggregatesDBResponseItems[i].Timestamp,
					GroupBy:   map[string]interface{}{logAggregatesDBResponseItems[i].GroupBy: logAggregatesDBResponseItems[i].Value},
				}
			} else if params.GroupBy == "" {
				aggregateResponse.Items[logAggregatesDBResponseItems[i].Timestamp] = model.LogsAggregatesResponseItem{
					Timestamp: logAggregatesDBResponseItems[i].Timestamp,
					Value:     logAggregatesDBResponseItems[i].Value,
				}
			}
		}

	}

	return &aggregateResponse, nil
}

func (r *ClickHouseReader) QueryDashboardVars(ctx context.Context, query string) (*model.DashboardVar, error) {
	var result = model.DashboardVar{VariableValues: make([]interface{}, 0)}
	rows, err := r.db.Query(ctx, query)

	zap.L().Info(query)

	if err != nil {
		zap.L().Error("Error in processing sql query", zap.Error(err))
		return nil, err
	}

	var (
		columnTypes = rows.ColumnTypes()
		vars        = make([]interface{}, len(columnTypes))
	)
	for i := range columnTypes {
		vars[i] = reflect.New(columnTypes[i].ScanType()).Interface()
	}

	defer rows.Close()
	for rows.Next() {
		if err := rows.Scan(vars...); err != nil {
			return nil, err
		}
		for _, v := range vars {
			switch v := v.(type) {
			case *string, *int8, *int16, *int32, *int64, *uint8, *uint16, *uint32, *uint64, *float32, *float64, *time.Time, *bool:
				result.VariableValues = append(result.VariableValues, reflect.ValueOf(v).Elem().Interface())
			default:
				return nil, fmt.Errorf("unsupported value type encountered")
			}
		}
	}
	return &result, nil
}

func (r *ClickHouseReader) GetMetricAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest, skipDotNames bool, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error) {

	var query string
	var err error
	var rows driver.Rows
	var response v3.AggregateAttributeResponse

	query = fmt.Sprintf("SELECT metric_name, type, is_monotonic, temporality FROM %s.%s WHERE metric_name ILIKE $1 GROUP BY metric_name, type, is_monotonic, temporality", signozMetricDBName, signozTSTableNameV41Day)
	if req.Limit != 0 {
		query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
	}
	rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText))

	if err != nil {
		zap.L().Error("Error while executing query", zap.Error(err))
		return nil, fmt.Errorf("error while executing query: %s", err.Error())
	}
	defer rows.Close()

	seen := make(map[string]struct{})

	var metricName, typ, temporality string
	var isMonotonic bool
	for rows.Next() {
		if err := rows.Scan(&metricName, &typ, &isMonotonic, &temporality); err != nil {
			return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
		}
		if skipDotNames && strings.Contains(metricName, ".") {
			continue
		}

		if skipSignozMetrics && strings.HasPrefix(metricName, "signoz_") {
			continue
		}

		metadata, apiError := r.GetUpdatedMetricsMetadata(ctx, orgID, metricName)
		if apiError != nil {
			zap.L().Error("Error in getting metrics cached metadata", zap.Error(apiError))
		}
		if updatedMetadata, exist := metadata[metricName]; exist {
			typ = string(updatedMetadata.MetricType)
			isMonotonic = updatedMetadata.IsMonotonic
			temporality = string(updatedMetadata.Temporality)
		}

		// Non-monotonic cumulative sums are treated as gauges
		if typ == "Sum" && !isMonotonic && temporality == string(v3.Cumulative) {
			typ = "Gauge"
		}
		// unlike traces/logs `tag`/`resource` type, the `Type` will be metric type
		key := v3.AttributeKey{
			Key:      metricName,
			DataType: v3.AttributeKeyDataTypeFloat64,
			Type:     v3.AttributeKeyType(typ),
			IsColumn: true,
		}
		// remove duplicates
		if _, ok := seen[metricName+typ]; ok {
			continue
		}
		seen[metricName+typ] = struct{}{}
		response.AttributeKeys = append(response.AttributeKeys, key)
	}

	return &response, nil
}

func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {

	var query string
	var err error
	var rows driver.Rows
	var response v3.FilterAttributeKeyResponse

	// skips the internal attributes i.e attributes starting with __
	query = fmt.Sprintf("SELECT arrayJoin(tagKeys) AS distinctTagKey FROM (SELECT JSONExtractKeys(labels) AS tagKeys FROM %s.%s WHERE metric_name=$1 AND unix_milli >= $2 AND __normalized = true GROUP BY tagKeys) WHERE distinctTagKey ILIKE $3 AND distinctTagKey NOT LIKE '\\_\\_%%' GROUP BY distinctTagKey", signozMetricDBName, signozTSTableNameV41Day)
	if req.Limit != 0 {
		query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
	}
	rows, err = r.db.Query(ctx, query, req.AggregateAttribute, common.PastDayRoundOff(), fmt.Sprintf("%%%s%%", req.SearchText))
	if err != nil {
		zap.L().Error("Error while executing query", zap.Error(err))
		return nil, fmt.Errorf("error while executing query: %s", err.Error())
	}
	defer rows.Close()

	var attributeKey string
	for rows.Next() {
		if err := rows.Scan(&attributeKey); err != nil {
			return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
		}
		key := v3.AttributeKey{
			Key:      attributeKey,
			DataType: v3.AttributeKeyDataTypeString, // https://github.com/OpenObservability/OpenMetrics/blob/main/proto/openmetrics_data_model.proto#L64-L72.
			Type:     v3.AttributeKeyTypeTag,
			IsColumn: false,
		}
		response.AttributeKeys = append(response.AttributeKeys, key)
	}

	return &response, nil
}

func (r *ClickHouseReader) GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {

	var query string
	var err error
	var rows driver.Rows
	var attributeValues v3.FilterAttributeValueResponse

	query = fmt.Sprintf("SELECT JSONExtractString(labels, $1) AS tagValue FROM %s.%s WHERE metric_name IN $2 AND JSONExtractString(labels, $3) ILIKE $4 AND unix_milli >= $5 GROUP BY tagValue", signozMetricDBName, signozTSTableNameV41Day)
	if req.Limit != 0 {
		query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
	}
	names := []string{req.AggregateAttribute}
	if _, ok := metrics.MetricsUnderTransition[req.AggregateAttribute]; ok {
		names = append(names, metrics.MetricsUnderTransition[req.AggregateAttribute])
	}

	rows, err = r.db.Query(ctx, query, req.FilterAttributeKey, names, req.FilterAttributeKey, fmt.Sprintf("%%%s%%", req.SearchText), common.PastDayRoundOff())

	if err != nil {
		zap.L().Error("Error while executing query", zap.Error(err))
		return nil, fmt.Errorf("error while executing query: %s", err.Error())
	}
	defer rows.Close()

	var atrributeValue string
	for rows.Next() {
		if err := rows.Scan(&atrributeValue); err != nil {
			return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
		}
		// https://github.com/OpenObservability/OpenMetrics/blob/main/proto/openmetrics_data_model.proto#L64-L72
		// this may change in future if we use OTLP as the data model
		attributeValues.StringAttributeValues = append(attributeValues.StringAttributeValues, atrributeValue)
	}

	return &attributeValues, nil
}

func (r *ClickHouseReader) GetMetricMetadata(ctx context.Context, orgID valuer.UUID, metricName, serviceName string) (*v3.MetricMetadataResponse, error) {

	unixMilli := common.PastDayRoundOff()

	// Note: metric metadata should be accessible regardless of the time range selection
	// our standard retention period is 30 days, so we are querying the table v4_1_day to reduce the
	// amount of data scanned
	query := fmt.Sprintf("SELECT temporality, description, type, unit, is_monotonic from %s.%s WHERE metric_name=$1 AND unix_milli >= $2 GROUP BY temporality, description, type, unit, is_monotonic", signozMetricDBName, signozTSTableNameV41Day)
	rows, err := r.db.Query(ctx, query, metricName, unixMilli)
	if err != nil {
		zap.L().Error("Error while fetching metric metadata", zap.Error(err))
		return nil, fmt.Errorf("error while fetching metric metadata: %s", err.Error())
	}
	defer rows.Close()

	var deltaExists, isMonotonic bool
	var temporality, description, metricType, unit string
	for rows.Next() {
		if err := rows.Scan(&temporality, &description, &metricType, &unit, &isMonotonic); err != nil {
			return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
		}
		if temporality == string(v3.Delta) {
			deltaExists = true
		}
	}
	metadata, apiError := r.GetUpdatedMetricsMetadata(ctx, orgID, metricName)
	if apiError != nil {
		zap.L().Error("Error in getting metric cached metadata", zap.Error(apiError))
	}
	if updatedMetadata, exist := metadata[metricName]; exist {
		metricType = string(updatedMetadata.MetricType)
		temporality = string(updatedMetadata.Temporality)
		if temporality == string(v3.Delta) {
			deltaExists = true
		}
		isMonotonic = updatedMetadata.IsMonotonic
		if updatedMetadata.Description != "" {
			description = updatedMetadata.Description
		}
		if updatedMetadata.Unit != "" {
			unit = updatedMetadata.Unit
		}
	}

	query = fmt.Sprintf("SELECT JSONExtractString(labels, 'le') as le from %s.%s WHERE metric_name=$1 AND unix_milli >= $2 AND type = 'Histogram' AND JSONExtractString(labels, 'service_name') = $3 GROUP BY le ORDER BY le", signozMetricDBName, signozTSTableNameV41Day)
	rows, err = r.db.Query(ctx, query, metricName, unixMilli, serviceName)
	if err != nil {
		zap.L().Error("Error while executing query", zap.Error(err))
		return nil, fmt.Errorf("error while executing query: %s", err.Error())
	}
	defer rows.Close()

	var leFloat64 []float64
	for rows.Next() {
		var leStr string
		if err := rows.Scan(&leStr); err != nil {
			return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
		}
		le, err := strconv.ParseFloat(leStr, 64)
		// ignore the error and continue if the value is not a float
		// ideally this should not happen but we have seen ClickHouse
		// returning empty string for some values
		if err != nil {
			zap.L().Error("error while parsing le value", zap.Error(err))
			continue
		}
		if math.IsInf(le, 0) {
			continue
		}
		leFloat64 = append(leFloat64, le)
	}

	return &v3.MetricMetadataResponse{
		Delta:       deltaExists,
		Le:          leFloat64,
		Description: description,
		Unit:        unit,
		Type:        metricType,
		IsMonotonic: isMonotonic,
		Temporality: temporality,
	}, nil
}

// GetCountOfThings returns the count of things in the query
// This is a generic function that can be used to check if any data exists for a given query
func (r *ClickHouseReader) GetCountOfThings(ctx context.Context, query string) (uint64, error) {
	var count uint64
	err := r.db.QueryRow(ctx, query).Scan(&count)
	if err != nil {
		return 0, err
	}
	return count, nil
}

func (r *ClickHouseReader) GetLatestReceivedMetric(
	ctx context.Context, metricNames []string, labelValues map[string]string,
) (*model.MetricStatus, *model.ApiError) {
	// at least 1 metric name must be specified.
	// this query can be too slow otherwise.
	if len(metricNames) < 1 {
		return nil, model.BadRequest(fmt.Errorf("atleast 1 metric name must be specified"))
	}

	quotedMetricNames := []string{}
	for _, m := range metricNames {
		quotedMetricNames = append(quotedMetricNames, utils.ClickHouseFormattedValue(m))
	}
	commaSeparatedMetricNames := strings.Join(quotedMetricNames, ", ")

	whereClauseParts := []string{
		fmt.Sprintf(`metric_name in (%s)`, commaSeparatedMetricNames),
	}

	if labelValues != nil {
		for label, val := range labelValues {
			whereClauseParts = append(
				whereClauseParts,
				fmt.Sprintf(`JSONExtractString(labels, '%s') = '%s'`, label, val),
			)
		}
	}

	if len(whereClauseParts) < 1 {
		return nil, nil
	}

	whereClause := strings.Join(whereClauseParts, " AND ")

	query := fmt.Sprintf(`
		SELECT metric_name, anyLast(labels), max(unix_milli)
		from %s.%s
		where %s
		group by metric_name
		limit 1
		`, signozMetricDBName, signozTSTableNameV4, whereClause,
	)

	rows, err := r.db.Query(ctx, query)
	if err != nil {
		return nil, model.InternalError(fmt.Errorf(
			"couldn't query clickhouse for received metrics status: %w", err,
		))
	}
	defer rows.Close()

	var result *model.MetricStatus

	if rows.Next() {

		result = &model.MetricStatus{}
		var labelsJson string

		err := rows.Scan(
			&result.MetricName,
			&labelsJson,
			&result.LastReceivedTsMillis,
		)
		if err != nil {
			return nil, model.InternalError(fmt.Errorf(
				"couldn't scan metric status row: %w", err,
			))
		}

		err = json.Unmarshal([]byte(labelsJson), &result.LastReceivedLabels)
		if err != nil {
			return nil, model.InternalError(fmt.Errorf(
				"couldn't unmarshal metric labels json: %w", err,
			))
		}
	}

	return result, nil
}

func isColumn(tableStatement, attrType, field, datType string) bool {
	name := fmt.Sprintf("`%s`", utils.GetClickhouseColumnNameV2(attrType, datType, field))
	return strings.Contains(tableStatement, fmt.Sprintf("%s ", name))
}

func (r *ClickHouseReader) GetLogAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) {

	var query string
	var err error
	var rows driver.Rows
	var response v3.AggregateAttributeResponse
	var stringAllowed bool

	where := ""
	switch req.Operator {
	case
		v3.AggregateOperatorCountDistinct,
		v3.AggregateOperatorCount:
		where = "tag_key ILIKE $1"
		stringAllowed = true
	case
		v3.AggregateOperatorRateSum,
		v3.AggregateOperatorRateMax,
		v3.AggregateOperatorRateAvg,
		v3.AggregateOperatorRate,
		v3.AggregateOperatorRateMin,
		v3.AggregateOperatorP05,
		v3.AggregateOperatorP10,
		v3.AggregateOperatorP20,
		v3.AggregateOperatorP25,
		v3.AggregateOperatorP50,
		v3.AggregateOperatorP75,
		v3.AggregateOperatorP90,
		v3.AggregateOperatorP95,
		v3.AggregateOperatorP99,
		v3.AggregateOperatorAvg,
		v3.AggregateOperatorSum,
		v3.AggregateOperatorMin,
		v3.AggregateOperatorMax:
		where = "tag_key ILIKE $1 AND (tag_data_type='int64' or tag_data_type='float64')"
		stringAllowed = false
	case
		v3.AggregateOperatorNoOp:
		return &v3.AggregateAttributeResponse{}, nil
	default:
		return nil, fmt.Errorf("unsupported aggregate operator")
	}

	query = fmt.Sprintf("SELECT DISTINCT(tag_key), tag_type, tag_data_type from %s.%s WHERE %s and tag_type != 'logfield' limit $2", r.logsDB, r.logsTagAttributeTableV2, where)
	rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText), req.Limit)
	if err != nil {
		zap.L().Error("Error while executing query", zap.Error(err))
		return nil, fmt.Errorf("error while executing query: %s", err.Error())
	}
	defer rows.Close()

	statements := []model.ShowCreateTableStatement{}
	query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTableName)
	err = r.db.Select(ctx, &statements, query)
	if err != nil {
		return nil, fmt.Errorf("error while fetching logs schema: %s", err.Error())
	}

	var tagKey string
	var dataType string
	var attType string
	for rows.Next() {
		if err := rows.Scan(&tagKey, &attType, &dataType); err != nil {
			return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
		}
		key := v3.AttributeKey{
			Key:      tagKey,
			DataType: v3.AttributeKeyDataType(dataType),
			Type:     v3.AttributeKeyType(attType),
			IsColumn: isColumn(statements[0].Statement, attType, tagKey, dataType),
		}
		response.AttributeKeys = append(response.AttributeKeys, key)
	}
	// add other attributes
	for _, field := range constants.StaticFieldsLogsV3 {
		if (!stringAllowed && field.DataType == v3.AttributeKeyDataTypeString) || (v3.AttributeKey{} == field) {
			continue
		} else if len(req.SearchText) == 0 || strings.Contains(field.Key, req.SearchText) {
			response.AttributeKeys = append(response.AttributeKeys, field)
		}
	}

	return &response, nil
}

func (r *ClickHouseReader) GetLogAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
	var query string
	var err error
	var rows driver.Rows
	var response v3.FilterAttributeKeyResponse

	tagTypeFilter := `tag_type != 'logfield'`
	if req.TagType != "" {
		tagTypeFilter = fmt.Sprintf(`tag_type != 'logfield' and tag_type = '%s'`, req.TagType)
	}

	if len(req.SearchText) != 0 {
		query = fmt.Sprintf("select distinct tag_key, tag_type, tag_data_type from  %s.%s where %s and tag_key ILIKE $1 limit $2", r.logsDB, r.logsTagAttributeTableV2, tagTypeFilter)
		rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText), req.Limit)
	} else {
		query = fmt.Sprintf("select distinct tag_key, tag_type, tag_data_type from %s.%s where %s limit $1", r.logsDB, r.logsTagAttributeTableV2, tagTypeFilter)
		rows, err = r.db.Query(ctx, query, req.Limit)
	}

	if err != nil {
		zap.L().Error("Error while executing query", zap.Error(err))
		return nil, fmt.Errorf("error while executing query: %s", err.Error())
	}
	defer rows.Close()

	statements := []model.ShowCreateTableStatement{}
	query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTableName)
	err = r.db.Select(ctx, &statements, query)
	if err != nil {
		return nil, fmt.Errorf("error while fetching logs schema: %s", err.Error())
	}

	var attributeKey string
	var attributeDataType string
	var tagType string
	for rows.Next() {
		if err := rows.Scan(&attributeKey, &tagType, &attributeDataType); err != nil {
			return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
		}

		key := v3.AttributeKey{
			Key:      attributeKey,
			DataType: v3.AttributeKeyDataType(attributeDataType),
			Type:     v3.AttributeKeyType(tagType),
			IsColumn: isColumn(statements[0].Statement, tagType, attributeKey, attributeDataType),
		}

		response.AttributeKeys = append(response.AttributeKeys, key)
	}

	// add other attributes only when the tagType is not specified
	// i.e retrieve all attributes
	if req.TagType == "" {
		for _, f := range constants.StaticFieldsLogsV3 {
			if (v3.AttributeKey{} == f) {
				continue
			}
			if len(req.SearchText) == 0 || strings.Contains(f.Key, req.SearchText) {
				response.AttributeKeys = append(response.AttributeKeys, f)
			}
		}
	}

	return &response, nil
}

func (r *ClickHouseReader) FetchRelatedValues(ctx context.Context, req *v3.FilterAttributeValueRequest) ([]string, error) {
	var andConditions []string

	andConditions = append(andConditions, fmt.Sprintf("unix_milli >= %d", req.StartTimeMillis))
	andConditions = append(andConditions, fmt.Sprintf("unix_milli <= %d", req.EndTimeMillis))

	if len(req.ExistingFilterItems) != 0 {
		for _, item := range req.ExistingFilterItems {
			// we only support string for related values
			if item.Key.DataType != v3.AttributeKeyDataTypeString {
				continue
			}

			var colName string
			switch item.Key.Type {
			case v3.AttributeKeyTypeResource:
				colName = "resource_attributes"
			case v3.AttributeKeyTypeTag:
				colName = "attributes"
			default:
				// we only support resource and tag for related values as of now
				continue
			}
			// IN doesn't make use of map value index, we convert it to = or !=
			operator := item.Operator
			if v3.FilterOperator(strings.ToLower(string(item.Operator))) == v3.FilterOperatorIn {
				operator = "="
			} else if v3.FilterOperator(strings.ToLower(string(item.Operator))) == v3.FilterOperatorNotIn {
				operator = "!="
			}
			addCondition := func(val string) {
				andConditions = append(andConditions, fmt.Sprintf("mapContains(%s, '%s') AND %s['%s'] %s %s", colName, item.Key.Key, colName, item.Key.Key, operator, val))
			}
			switch v := item.Value.(type) {
			case string:
				fmtVal := utils.ClickHouseFormattedValue(v)
				addCondition(fmtVal)
			case []string:
				for _, val := range v {
					fmtVal := utils.ClickHouseFormattedValue(val)
					addCondition(fmtVal)
				}
			case []interface{}:
				for _, val := range v {
					fmtVal := utils.ClickHouseFormattedValue(val)
					addCondition(fmtVal)
				}
			}
		}
	}
	whereClause := strings.Join(andConditions, " AND ")

	var selectColumn string
	switch req.TagType {
	case v3.TagTypeResource:
		selectColumn = "resource_attributes" + "['" + req.FilterAttributeKey + "']"
	case v3.TagTypeTag:
		selectColumn = "attributes" + "['" + req.FilterAttributeKey + "']"
	default:
		selectColumn = "attributes" + "['" + req.FilterAttributeKey + "']"
	}

	filterSubQuery := fmt.Sprintf(
		"SELECT DISTINCT %s FROM %s.%s WHERE %s LIMIT 100",
		selectColumn,
		r.metadataDB,
		r.metadataTable,
		whereClause,
	)
	zap.L().Debug("filterSubQuery for related values", zap.String("query", filterSubQuery))

	rows, err := r.db.Query(ctx, filterSubQuery)
	if err != nil {
		return nil, fmt.Errorf("error while executing query: %s", err.Error())
	}
	defer rows.Close()

	var attributeValues []string
	for rows.Next() {
		var value string
		if err := rows.Scan(&value); err != nil {
			return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
		}
		if value != "" {
			attributeValues = append(attributeValues, value)
		}
	}

	return attributeValues, nil
}

func (r *ClickHouseReader) GetLogAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
	var err error
	var filterValueColumn string
	var rows driver.Rows
	var attributeValues v3.FilterAttributeValueResponse

	// if dataType or tagType is not present return empty response
	if len(req.FilterAttributeKeyDataType) == 0 || len(req.TagType) == 0 {
		// also check if it is not a top level key
		if _, ok := constants.StaticFieldsLogsV3[req.FilterAttributeKey]; !ok {
			return &v3.FilterAttributeValueResponse{}, nil
		}
	}

	// ignore autocomplete request for body
	if req.FilterAttributeKey == "body" || req.FilterAttributeKey == "__attrs" {
		return &v3.FilterAttributeValueResponse{}, nil
	}

	// if data type is bool, return true and false
	if req.FilterAttributeKeyDataType == v3.AttributeKeyDataTypeBool {
		return &v3.FilterAttributeValueResponse{
			BoolAttributeValues: []bool{true, false},
		}, nil
	}

	query := "select distinct"
	switch req.FilterAttributeKeyDataType {
	case v3.AttributeKeyDataTypeInt64:
		filterValueColumn = "number_value"
	case v3.AttributeKeyDataTypeFloat64:
		filterValueColumn = "number_value"
	case v3.AttributeKeyDataTypeString:
		filterValueColumn = "string_value"
	}

	searchText := fmt.Sprintf("%%%s%%", req.SearchText)

	// check if the tagKey is a topLevelColumn
	if _, ok := constants.StaticFieldsLogsV3[req.FilterAttributeKey]; ok {
		// query the column for the last 48 hours
		filterValueColumnWhere := req.FilterAttributeKey
		selectKey := req.FilterAttributeKey
		if req.FilterAttributeKeyDataType != v3.AttributeKeyDataTypeString {
			filterValueColumnWhere = fmt.Sprintf("toString(%s)", req.FilterAttributeKey)
			selectKey = fmt.Sprintf("toInt64(%s)", req.FilterAttributeKey)
		}

		// prepare the query and run
		if len(req.SearchText) != 0 {
			query = fmt.Sprintf("select distinct %s from %s.%s where timestamp >= toInt64(toUnixTimestamp(now() - INTERVAL 48 HOUR)*1000000000) and %s ILIKE $1 limit $2", selectKey, r.logsDB, r.logsLocalTableName, filterValueColumnWhere)
			rows, err = r.db.Query(ctx, query, searchText, req.Limit)
		} else {
			query = fmt.Sprintf("select distinct %s from %s.%s where timestamp >= toInt64(toUnixTimestamp(now() - INTERVAL 48 HOUR)*1000000000) limit $1", selectKey, r.logsDB, r.logsLocalTableName)
			rows, err = r.db.Query(ctx, query, req.Limit)
		}
	} else if len(req.SearchText) != 0 {
		filterValueColumnWhere := filterValueColumn
		if req.FilterAttributeKeyDataType != v3.AttributeKeyDataTypeString {
			filterValueColumnWhere = fmt.Sprintf("toString(%s)", filterValueColumn)
		}
		query = fmt.Sprintf("SELECT DISTINCT %s FROM %s.%s WHERE tag_key=$1 AND %s ILIKE $2 AND tag_type=$3 LIMIT $4", filterValueColumn, r.logsDB, r.logsTagAttributeTableV2, filterValueColumnWhere)
		rows, err = r.db.Query(ctx, query, req.FilterAttributeKey, searchText, req.TagType, req.Limit)
	} else {
		query = fmt.Sprintf("SELECT DISTINCT %s FROM %s.%s WHERE tag_key=$1 AND tag_type=$2 LIMIT $3", filterValueColumn, r.logsDB, r.logsTagAttributeTableV2)
		rows, err = r.db.Query(ctx, query, req.FilterAttributeKey, req.TagType, req.Limit)
	}

	if err != nil {
		zap.L().Error("Error while executing query", zap.Error(err))
		return nil, fmt.Errorf("error while executing query: %s", err.Error())
	}
	defer rows.Close()

	var strAttributeValue string
	var float64AttributeValue sql.NullFloat64
	var int64AttributeValue sql.NullInt64
	for rows.Next() {
		switch req.FilterAttributeKeyDataType {
		case v3.AttributeKeyDataTypeInt64:
			if err := rows.Scan(&int64AttributeValue); err != nil {
				return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
			}
			if int64AttributeValue.Valid {
				attributeValues.NumberAttributeValues = append(attributeValues.NumberAttributeValues, int64AttributeValue.Int64)
			}
		case v3.AttributeKeyDataTypeFloat64:
			if err := rows.Scan(&float64AttributeValue); err != nil {
				return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
			}
			if float64AttributeValue.Valid {
				attributeValues.NumberAttributeValues = append(attributeValues.NumberAttributeValues, float64AttributeValue.Float64)
			}
		case v3.AttributeKeyDataTypeString:
			if err := rows.Scan(&strAttributeValue); err != nil {
				return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
			}
			attributeValues.StringAttributeValues = append(attributeValues.StringAttributeValues, strAttributeValue)
		}
	}

	if req.IncludeRelated {
		relatedValues, _ := r.FetchRelatedValues(ctx, req)
		attributeValues.RelatedValues = &v3.FilterAttributeValueResponse{
			StringAttributeValues: relatedValues,
		}
	}

	return &attributeValues, nil

}

func readRow(vars []interface{}, columnNames []string, countOfNumberCols int) ([]string, map[string]string, []map[string]string, *v3.Point) {
	// Each row will have a value and a timestamp, and an optional list of label values
	// example: {Timestamp: ..., Value: ...}
	// The timestamp may also not present in some cases where the time series is reduced to single value
	var point v3.Point

	// groupBy is a container to hold label values for the current point
	// example: ["frontend", "/fetch"]
	var groupBy []string

	var groupAttributesArray []map[string]string
	// groupAttributes is a container to hold the key-value pairs for the current
	// metric point.
	// example: {"serviceName": "frontend", "operation": "/fetch"}
	groupAttributes := make(map[string]string)

	isValidPoint := false

	for idx, v := range vars {
		colName := columnNames[idx]
		switch v := v.(type) {
		case *string:
			// special case for returning all labels in metrics datasource
			if colName == "fullLabels" {
				var metric map[string]string
				err := json.Unmarshal([]byte(*v), &metric)
				if err != nil {
					zap.L().Error("unexpected error encountered", zap.Error(err))
				}
				for key, val := range metric {
					groupBy = append(groupBy, val)
					if _, ok := groupAttributes[key]; !ok {
						groupAttributesArray = append(groupAttributesArray, map[string]string{key: val})
					}
					groupAttributes[key] = val
				}
			} else {
				groupBy = append(groupBy, *v)
				if _, ok := groupAttributes[colName]; !ok {
					groupAttributesArray = append(groupAttributesArray, map[string]string{colName: *v})
				}
				groupAttributes[colName] = *v
			}
		case *time.Time:
			point.Timestamp = v.UnixMilli()
		case *float64, *float32:
			if _, ok := constants.ReservedColumnTargetAliases[colName]; ok || countOfNumberCols == 1 {
				isValidPoint = true
				point.Value = float64(reflect.ValueOf(v).Elem().Float())
			} else {
				val := strconv.FormatFloat(reflect.ValueOf(v).Elem().Float(), 'f', -1, 64)
				groupBy = append(groupBy, val)
				if _, ok := groupAttributes[colName]; !ok {
					groupAttributesArray = append(groupAttributesArray, map[string]string{colName: val})
				}
				groupAttributes[colName] = val
			}
		case **float64, **float32:
			val := reflect.ValueOf(v)
			if val.IsValid() && !val.IsNil() && !val.Elem().IsNil() {
				value := reflect.ValueOf(v).Elem().Elem().Float()
				if _, ok := constants.ReservedColumnTargetAliases[colName]; ok || countOfNumberCols == 1 {
					isValidPoint = true
					point.Value = value
				} else {
					val := strconv.FormatFloat(value, 'f', -1, 64)
					groupBy = append(groupBy, val)
					if _, ok := groupAttributes[colName]; !ok {
						groupAttributesArray = append(groupAttributesArray, map[string]string{colName: val})
					}
					groupAttributes[colName] = val
				}
			}
		case *uint, *uint8, *uint64, *uint16, *uint32:
			if _, ok := constants.ReservedColumnTargetAliases[colName]; ok || countOfNumberCols == 1 {
				isValidPoint = true
				point.Value = float64(reflect.ValueOf(v).Elem().Uint())
			} else {
				groupBy = append(groupBy, fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Uint()))
				if _, ok := groupAttributes[colName]; !ok {
					groupAttributesArray = append(groupAttributesArray, map[string]string{colName: fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Uint())})
				}
				groupAttributes[colName] = fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Uint())
			}
		case **uint, **uint8, **uint64, **uint16, **uint32:
			val := reflect.ValueOf(v)
			if val.IsValid() && !val.IsNil() && !val.Elem().IsNil() {
				value := reflect.ValueOf(v).Elem().Elem().Uint()
				if _, ok := constants.ReservedColumnTargetAliases[colName]; ok || countOfNumberCols == 1 {
					isValidPoint = true
					point.Value = float64(value)
				} else {
					groupBy = append(groupBy, fmt.Sprintf("%v", value))
					if _, ok := groupAttributes[colName]; !ok {
						groupAttributesArray = append(groupAttributesArray, map[string]string{colName: fmt.Sprintf("%v", value)})
					}
					groupAttributes[colName] = fmt.Sprintf("%v", value)
				}
			}
		case *int, *int8, *int16, *int32, *int64:
			if _, ok := constants.ReservedColumnTargetAliases[colName]; ok || countOfNumberCols == 1 {
				isValidPoint = true
				point.Value = float64(reflect.ValueOf(v).Elem().Int())
			} else {
				groupBy = append(groupBy, fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Int()))
				if _, ok := groupAttributes[colName]; !ok {
					groupAttributesArray = append(groupAttributesArray, map[string]string{colName: fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Int())})
				}
				groupAttributes[colName] = fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Int())
			}
		case **int, **int8, **int16, **int32, **int64:
			val := reflect.ValueOf(v)
			if val.IsValid() && !val.IsNil() && !val.Elem().IsNil() {
				value := reflect.ValueOf(v).Elem().Elem().Int()
				if _, ok := constants.ReservedColumnTargetAliases[colName]; ok || countOfNumberCols == 1 {
					isValidPoint = true
					point.Value = float64(value)
				} else {
					groupBy = append(groupBy, fmt.Sprintf("%v", value))
					if _, ok := groupAttributes[colName]; !ok {
						groupAttributesArray = append(groupAttributesArray, map[string]string{colName: fmt.Sprintf("%v", value)})
					}
					groupAttributes[colName] = fmt.Sprintf("%v", value)
				}
			}
		case *bool:
			groupBy = append(groupBy, fmt.Sprintf("%v", *v))
			if _, ok := groupAttributes[colName]; !ok {
				groupAttributesArray = append(groupAttributesArray, map[string]string{colName: fmt.Sprintf("%v", *v)})
			}
			groupAttributes[colName] = fmt.Sprintf("%v", *v)

		default:
			zap.L().Error("unsupported var type found in query builder query result", zap.Any("v", v), zap.String("colName", colName))
		}
	}
	if isValidPoint {
		return groupBy, groupAttributes, groupAttributesArray, &point
	}
	return groupBy, groupAttributes, groupAttributesArray, nil
}

func readRowsForTimeSeriesResult(rows driver.Rows, vars []interface{}, columnNames []string, countOfNumberCols int) ([]*v3.Series, error) {
	// when groupBy is applied, each combination of cartesian product
	// of attribute values is a separate series. Each item in seriesToPoints
	// represent a unique series where the key is sorted attribute values joined
	// by "," and the value is the list of points for that series

	// For instance, group by (serviceName, operation)
	// with two services and three operations in each will result in (maximum of) 6 series
	// ("frontend", "order") x ("/fetch", "/fetch/{Id}", "/order")
	//
	// ("frontend", "/fetch")
	// ("frontend", "/fetch/{Id}")
	// ("frontend", "/order")
	// ("order", "/fetch")
	// ("order", "/fetch/{Id}")
	// ("order", "/order")
	seriesToPoints := make(map[string][]v3.Point)
	var keys []string
	// seriesToAttrs is a mapping of key to a map of attribute key to attribute value
	// for each series. This is used to populate the series' attributes
	// For instance, for the above example, the seriesToAttrs will be
	// {
	//   "frontend,/fetch": {"serviceName": "frontend", "operation": "/fetch"},
	//   "frontend,/fetch/{Id}": {"serviceName": "frontend", "operation": "/fetch/{Id}"},
	//   "frontend,/order": {"serviceName": "frontend", "operation": "/order"},
	//   "order,/fetch": {"serviceName": "order", "operation": "/fetch"},
	//   "order,/fetch/{Id}": {"serviceName": "order", "operation": "/fetch/{Id}"},
	//   "order,/order": {"serviceName": "order", "operation": "/order"},
	// }
	seriesToAttrs := make(map[string]map[string]string)
	labelsArray := make(map[string][]map[string]string)
	for rows.Next() {
		if err := rows.Scan(vars...); err != nil {
			return nil, err
		}
		groupBy, groupAttributes, groupAttributesArray, metricPoint := readRow(vars, columnNames, countOfNumberCols)
		// skip the point if the value is NaN or Inf
		// are they ever useful enough to be returned?
		if metricPoint != nil && (math.IsNaN(metricPoint.Value) || math.IsInf(metricPoint.Value, 0)) {
			continue
		}
		sort.Strings(groupBy)
		key := strings.Join(groupBy, "")
		if _, exists := seriesToAttrs[key]; !exists {
			keys = append(keys, key)
		}
		seriesToAttrs[key] = groupAttributes
		labelsArray[key] = groupAttributesArray
		if metricPoint != nil {
			seriesToPoints[key] = append(seriesToPoints[key], *metricPoint)
		}
	}

	var seriesList []*v3.Series
	for _, key := range keys {
		points := seriesToPoints[key]
		series := v3.Series{Labels: seriesToAttrs[key], Points: points, LabelsArray: labelsArray[key]}
		seriesList = append(seriesList, &series)
	}
	return seriesList, getPersonalisedError(rows.Err())
}

func logCommentKVs(ctx context.Context) map[string]string {
	kv := ctx.Value(common.LogCommentKey)
	if kv == nil {
		return nil
	}
	logCommentKVs, ok := kv.(map[string]string)
	if !ok {
		return nil
	}
	return logCommentKVs
}

// GetTimeSeriesResultV3 runs the query and returns list of time series
func (r *ClickHouseReader) GetTimeSeriesResultV3(ctx context.Context, query string) ([]*v3.Series, error) {

	ctxArgs := map[string]interface{}{"query": query}
	for k, v := range logCommentKVs(ctx) {
		ctxArgs[k] = v
	}

	defer utils.Elapsed("GetTimeSeriesResultV3", ctxArgs)()

	// Hook up query progress reporting if requested.
	queryId := ctx.Value("queryId")
	if queryId != nil {
		qid, ok := queryId.(string)
		if !ok {
			zap.L().Error("GetTimeSeriesResultV3: queryId in ctx not a string as expected", zap.Any("queryId", queryId))

		} else {
			ctx = clickhouse.Context(ctx, clickhouse.WithProgress(
				func(p *clickhouse.Progress) {
					go func() {
						err := r.queryProgressTracker.ReportQueryProgress(qid, p)
						if err != nil {
							zap.L().Error(
								"Couldn't report query progress",
								zap.String("queryId", qid), zap.Error(err),
							)
						}
					}()
				},
			))
		}
	}

	rows, err := r.db.Query(ctx, query)

	if err != nil {
		zap.L().Error("error while reading time series result", zap.Error(err))
		return nil, errors.New(err.Error())
	}
	defer rows.Close()

	var (
		columnTypes = rows.ColumnTypes()
		columnNames = rows.Columns()
		vars        = make([]interface{}, len(columnTypes))
	)
	var countOfNumberCols int

	for i := range columnTypes {
		vars[i] = reflect.New(columnTypes[i].ScanType()).Interface()
		switch columnTypes[i].ScanType().Kind() {
		case reflect.Float32,
			reflect.Float64,
			reflect.Uint,
			reflect.Uint8,
			reflect.Uint16,
			reflect.Uint32,
			reflect.Uint64,
			reflect.Int,
			reflect.Int8,
			reflect.Int16,
			reflect.Int32,
			reflect.Int64:
			countOfNumberCols++
		}
	}

	return readRowsForTimeSeriesResult(rows, vars, columnNames, countOfNumberCols)
}

// GetListResultV3 runs the query and returns list of rows
func (r *ClickHouseReader) GetListResultV3(ctx context.Context, query string) ([]*v3.Row, error) {

	ctxArgs := map[string]interface{}{"query": query}
	for k, v := range logCommentKVs(ctx) {
		ctxArgs[k] = v
	}

	defer utils.Elapsed("GetListResultV3", ctxArgs)()

	rows, err := r.db.Query(ctx, query)

	if err != nil {
		zap.L().Error("error while reading time series result", zap.Error(err))
		return nil, errors.New(err.Error())
	}
	defer rows.Close()

	var (
		columnTypes = rows.ColumnTypes()
		columnNames = rows.Columns()
	)

	var rowList []*v3.Row

	for rows.Next() {
		var vars = make([]interface{}, len(columnTypes))
		for i := range columnTypes {
			vars[i] = reflect.New(columnTypes[i].ScanType()).Interface()
		}
		if err := rows.Scan(vars...); err != nil {
			return nil, err
		}
		row := map[string]interface{}{}
		var t time.Time
		for idx, v := range vars {
			if columnNames[idx] == "timestamp" {
				switch v := v.(type) {
				case *uint64:
					t = time.Unix(0, int64(*v))
				case *time.Time:
					t = *v
				}
			} else if columnNames[idx] == "timestamp_datetime" {
				t = *v.(*time.Time)
			} else if columnNames[idx] == "events" {
				var events []map[string]interface{}
				eventsFromDB, ok := v.(*[]string)
				if !ok {
					continue
				}
				for _, event := range *eventsFromDB {
					var eventMap map[string]interface{}
					json.Unmarshal([]byte(event), &eventMap)
					events = append(events, eventMap)
				}
				row[columnNames[idx]] = events
			} else {
				row[columnNames[idx]] = v
			}
		}

		// remove duplicate _ attributes for logs.
		// remove this function after a month
		removeDuplicateUnderscoreAttributes(row)

		rowList = append(rowList, &v3.Row{Timestamp: t, Data: row})
	}

	return rowList, getPersonalisedError(rows.Err())

}

func getPersonalisedError(err error) error {
	if err == nil {
		return nil
	}
	zap.L().Error("error while reading result", zap.Error(err))
	if strings.Contains(err.Error(), "code: 307") {
		return chErrors.ErrResourceBytesLimitExceeded
	}

	if strings.Contains(err.Error(), "code: 159") {
		return chErrors.ErrResourceTimeLimitExceeded
	}
	return err
}

func removeDuplicateUnderscoreAttributes(row map[string]interface{}) {
	if val, ok := row["attributes_int64"]; ok {
		attributes := val.(*map[string]int64)
		for key := range *attributes {
			if strings.Contains(key, ".") {
				uKey := strings.ReplaceAll(key, ".", "_")
				delete(*attributes, uKey)
			}
		}

	}

	if val, ok := row["attributes_float64"]; ok {
		attributes := val.(*map[string]float64)
		for key := range *attributes {
			if strings.Contains(key, ".") {
				uKey := strings.ReplaceAll(key, ".", "_")
				delete(*attributes, uKey)
			}
		}

	}

	if val, ok := row["attributes_bool"]; ok {
		attributes := val.(*map[string]bool)
		for key := range *attributes {
			if strings.Contains(key, ".") {
				uKey := strings.ReplaceAll(key, ".", "_")
				delete(*attributes, uKey)
			}
		}

	}
	for _, k := range []string{"attributes_string", "resources_string"} {
		if val, ok := row[k]; ok {
			attributes := val.(*map[string]string)
			for key := range *attributes {
				if strings.Contains(key, ".") {
					uKey := strings.ReplaceAll(key, ".", "_")
					delete(*attributes, uKey)
				}
			}

		}
	}
}
func (r *ClickHouseReader) CheckClickHouse(ctx context.Context) error {
	rows, err := r.db.Query(ctx, "SELECT 1")
	if err != nil {
		return err
	}
	defer rows.Close()

	return nil
}

func (r *ClickHouseReader) GetTraceAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) {
	var query string
	var err error
	var rows driver.Rows
	var response v3.AggregateAttributeResponse
	var stringAllowed bool

	where := ""
	switch req.Operator {
	case
		v3.AggregateOperatorCountDistinct,
		v3.AggregateOperatorCount:
		where = "tag_key ILIKE $1"
		stringAllowed = true
	case
		v3.AggregateOperatorRateSum,
		v3.AggregateOperatorRateMax,
		v3.AggregateOperatorRateAvg,
		v3.AggregateOperatorRate,
		v3.AggregateOperatorRateMin,
		v3.AggregateOperatorP05,
		v3.AggregateOperatorP10,
		v3.AggregateOperatorP20,
		v3.AggregateOperatorP25,
		v3.AggregateOperatorP50,
		v3.AggregateOperatorP75,
		v3.AggregateOperatorP90,
		v3.AggregateOperatorP95,
		v3.AggregateOperatorP99,
		v3.AggregateOperatorAvg,
		v3.AggregateOperatorSum,
		v3.AggregateOperatorMin,
		v3.AggregateOperatorMax:
		where = "tag_key ILIKE $1 AND tag_data_type='float64'"
		stringAllowed = false
	case
		v3.AggregateOperatorNoOp:
		return &v3.AggregateAttributeResponse{}, nil
	default:
		return nil, fmt.Errorf("unsupported aggregate operator")
	}
	query = fmt.Sprintf("SELECT DISTINCT(tag_key), tag_type, tag_data_type FROM %s.%s WHERE %s and tag_type != 'spanfield'", r.TraceDB, r.spanAttributeTableV2, where)
	if req.Limit != 0 {
		query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
	}
	rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText))

	if err != nil {
		zap.L().Error("Error while executing query", zap.Error(err))
		return nil, fmt.Errorf("error while executing query: %s", err.Error())
	}
	defer rows.Close()

	statements := []model.ShowCreateTableStatement{}
	query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.TraceDB, r.traceLocalTableName)
	err = r.db.Select(ctx, &statements, query)
	if err != nil {
		return nil, fmt.Errorf("error while fetching trace schema: %s", err.Error())
	}

	var tagKey string
	var dataType string
	var tagType string
	for rows.Next() {
		if err := rows.Scan(&tagKey, &tagType, &dataType); err != nil {
			return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
		}
		key := v3.AttributeKey{
			Key:      tagKey,
			DataType: v3.AttributeKeyDataType(dataType),
			Type:     v3.AttributeKeyType(tagType),
			IsColumn: isColumn(statements[0].Statement, tagType, tagKey, dataType),
		}

		if _, ok := constants.DeprecatedStaticFieldsTraces[tagKey]; !ok {
			response.AttributeKeys = append(response.AttributeKeys, key)
		}
	}

	fields := constants.NewStaticFieldsTraces
	// add the new static fields
	for _, field := range fields {
		if (!stringAllowed && field.DataType == v3.AttributeKeyDataTypeString) || (v3.AttributeKey{} == field) {
			continue
		} else if len(req.SearchText) == 0 || strings.Contains(field.Key, req.SearchText) {
			response.AttributeKeys = append(response.AttributeKeys, field)
		}
	}

	return &response, nil
}

func (r *ClickHouseReader) GetTraceAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {

	var query string
	var err error
	var rows driver.Rows
	var response v3.FilterAttributeKeyResponse

	tagTypeFilter := `tag_type != 'spanfield'`
	if req.TagType != "" {
		tagTypeFilter = fmt.Sprintf(`tag_type != 'spanfield' and tag_type = '%s'`, req.TagType)
	}

	query = fmt.Sprintf("SELECT DISTINCT(tag_key), tag_type, tag_data_type FROM %s.%s WHERE tag_key ILIKE $1 and %s LIMIT $2", r.TraceDB, r.spanAttributeTableV2, tagTypeFilter)

	rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText), req.Limit)

	if err != nil {
		zap.L().Error("Error while executing query", zap.Error(err))
		return nil, fmt.Errorf("error while executing query: %s", err.Error())
	}
	defer rows.Close()

	statements := []model.ShowCreateTableStatement{}
	query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.TraceDB, r.traceLocalTableName)
	err = r.db.Select(ctx, &statements, query)
	if err != nil {
		return nil, fmt.Errorf("error while fetching trace schema: %s", err.Error())
	}

	var tagKey string
	var dataType string
	var tagType string
	for rows.Next() {
		if err := rows.Scan(&tagKey, &tagType, &dataType); err != nil {
			return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
		}
		key := v3.AttributeKey{
			Key:      tagKey,
			DataType: v3.AttributeKeyDataType(dataType),
			Type:     v3.AttributeKeyType(tagType),
			IsColumn: isColumn(statements[0].Statement, tagType, tagKey, dataType),
		}

		// don't send deprecated static fields
		// this is added so that once the old tenants are moved to new schema,
		// they old attributes are not sent to the frontend autocomplete
		if _, ok := constants.DeprecatedStaticFieldsTraces[tagKey]; !ok {
			response.AttributeKeys = append(response.AttributeKeys, key)
		}
	}

	// remove this later just to have NewStaticFieldsTraces in the response
	fields := constants.NewStaticFieldsTraces
	// add the new static fields only when the tagType is not specified
	// i.e retrieve all attributes
	if req.TagType == "" {
		for _, f := range fields {
			if (v3.AttributeKey{} == f) {
				continue
			}
			if len(req.SearchText) == 0 || strings.Contains(f.Key, req.SearchText) {
				response.AttributeKeys = append(response.AttributeKeys, f)
			}
		}
	}

	return &response, nil
}

func (r *ClickHouseReader) GetTraceAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
	var query string
	var filterValueColumn string
	var err error
	var rows driver.Rows
	var attributeValues v3.FilterAttributeValueResponse

	// if dataType or tagType is not present return empty response
	if len(req.FilterAttributeKeyDataType) == 0 || len(req.TagType) == 0 {
		// add data type if it's a top level key
		if k, ok := constants.StaticFieldsTraces[req.FilterAttributeKey]; ok {
			req.FilterAttributeKeyDataType = k.DataType
		} else {
			return &v3.FilterAttributeValueResponse{}, nil
		}
	}

	// if data type is bool, return true and false
	if req.FilterAttributeKeyDataType == v3.AttributeKeyDataTypeBool {
		return &v3.FilterAttributeValueResponse{
			BoolAttributeValues: []bool{true, false},
		}, nil
	}

	query = "SELECT DISTINCT"
	switch req.FilterAttributeKeyDataType {
	case v3.AttributeKeyDataTypeFloat64:
		filterValueColumn = "number_value"
	case v3.AttributeKeyDataTypeString:
		filterValueColumn = "string_value"
	}

	searchText := fmt.Sprintf("%%%s%%", req.SearchText)

	// check if the tagKey is a topLevelColumn
	// here we are using StaticFieldsTraces instead of NewStaticFieldsTraces as we want to consider old columns as well.
	if _, ok := constants.StaticFieldsTraces[req.FilterAttributeKey]; ok {
		// query the column for the last 48 hours
		filterValueColumnWhere := req.FilterAttributeKey
		selectKey := req.FilterAttributeKey
		if req.FilterAttributeKeyDataType != v3.AttributeKeyDataTypeString {
			filterValueColumnWhere = fmt.Sprintf("toString(%s)", req.FilterAttributeKey)
			selectKey = fmt.Sprintf("toInt64(%s)", req.FilterAttributeKey)
		}

		// TODO(nitya): remove 24 hour limit in future after checking the perf/resource implications
		where := "timestamp >= toDateTime64(now() - INTERVAL 48 HOUR, 9) AND ts_bucket_start >= toUInt64(toUnixTimestamp(now() - INTERVAL 48 HOUR))"
		query = fmt.Sprintf("SELECT DISTINCT %s FROM %s.%s WHERE %s AND %s ILIKE $1 LIMIT $2", selectKey, r.TraceDB, r.traceTableName, where, filterValueColumnWhere)
		rows, err = r.db.Query(ctx, query, searchText, req.Limit)
	} else {
		filterValueColumnWhere := filterValueColumn
		if req.FilterAttributeKeyDataType != v3.AttributeKeyDataTypeString {
			filterValueColumnWhere = fmt.Sprintf("toString(%s)", filterValueColumn)
		}
		query = fmt.Sprintf("SELECT DISTINCT %s FROM %s.%s WHERE tag_key=$1 AND %s ILIKE $2 AND tag_type=$3 LIMIT $4", filterValueColumn, r.TraceDB, r.spanAttributeTableV2, filterValueColumnWhere)
		rows, err = r.db.Query(ctx, query, req.FilterAttributeKey, searchText, req.TagType, req.Limit)
	}

	if err != nil {
		zap.L().Error("Error while executing query", zap.Error(err))
		return nil, fmt.Errorf("error while executing query: %s", err.Error())
	}
	defer rows.Close()

	var strAttributeValue string
	var float64AttributeValue sql.NullFloat64
	for rows.Next() {
		switch req.FilterAttributeKeyDataType {
		case v3.AttributeKeyDataTypeFloat64:
			if err := rows.Scan(&float64AttributeValue); err != nil {
				return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
			}
			if float64AttributeValue.Valid {
				attributeValues.NumberAttributeValues = append(attributeValues.NumberAttributeValues, float64AttributeValue.Float64)
			}
		case v3.AttributeKeyDataTypeString:
			if err := rows.Scan(&strAttributeValue); err != nil {
				return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
			}
			attributeValues.StringAttributeValues = append(attributeValues.StringAttributeValues, strAttributeValue)
		}
	}

	if req.IncludeRelated {
		relatedValues, _ := r.FetchRelatedValues(ctx, req)
		attributeValues.RelatedValues = &v3.FilterAttributeValueResponse{
			StringAttributeValues: relatedValues,
		}
	}

	return &attributeValues, nil
}

func (r *ClickHouseReader) GetSpanAttributeKeys(ctx context.Context) (map[string]v3.AttributeKey, error) {
	var query string
	var err error
	var rows driver.Rows
	response := map[string]v3.AttributeKey{}

	query = fmt.Sprintf("SELECT DISTINCT(tagKey), tagType, dataType FROM %s.%s", r.TraceDB, r.spanAttributesKeysTable)

	rows, err = r.db.Query(ctx, query)
	if err != nil {
		zap.L().Error("Error while executing query", zap.Error(err))
		return nil, fmt.Errorf("error while executing query: %s", err.Error())
	}
	defer rows.Close()

	statements := []model.ShowCreateTableStatement{}
	query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.TraceDB, r.traceTableName)
	err = r.db.Select(ctx, &statements, query)
	if err != nil {
		return nil, fmt.Errorf("error while fetching trace schema: %s", err.Error())
	}

	var tagKey string
	var dataType string
	var tagType string
	for rows.Next() {
		if err := rows.Scan(&tagKey, &tagType, &dataType); err != nil {
			return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
		}
		key := v3.AttributeKey{
			Key:      tagKey,
			DataType: v3.AttributeKeyDataType(dataType),
			Type:     v3.AttributeKeyType(tagType),
			IsColumn: isColumn(statements[0].Statement, tagType, tagKey, dataType),
		}

		name := tagKey + "##" + tagType + "##" + strings.ToLower(dataType)
		response[name] = key
	}

	for _, key := range constants.StaticFieldsTraces {
		name := key.Key + "##" + key.Type.String() + "##" + strings.ToLower(key.DataType.String())
		response[name] = key
	}

	return response, nil
}

func (r *ClickHouseReader) LiveTailLogsV4(ctx context.Context, query string, timestampStart uint64, idStart string, client *model.LogsLiveTailClientV2) {
	if timestampStart == 0 {
		timestampStart = uint64(time.Now().UnixNano())
	} else {
		timestampStart = uint64(utils.GetEpochNanoSecs(int64(timestampStart)))
	}

	ticker := time.NewTicker(time.Duration(r.liveTailRefreshSeconds) * time.Second)
	defer ticker.Stop()
	for {
		select {
		case <-ctx.Done():
			done := true
			client.Done <- &done
			zap.L().Debug("closing go routine : " + client.Name)
			return
		case <-ticker.C:
			// get the new 100 logs as anything more older won't make sense
			var tmpQuery string
			bucketStart := (timestampStart / NANOSECOND) - 1800

			// we have to form the query differently if the resource filters are used
			if strings.Contains(query, r.logsResourceTableV2) {
				tmpQuery = fmt.Sprintf("seen_at_ts_bucket_start >=%d)) AND ts_bucket_start >=%d AND timestamp >=%d", bucketStart, bucketStart, timestampStart)
			} else {
				tmpQuery = fmt.Sprintf("ts_bucket_start >=%d AND timestamp >=%d", bucketStart, timestampStart)
			}
			if idStart != "" {
				tmpQuery = fmt.Sprintf("%s AND id > '%s'", tmpQuery, idStart)
			}

			// the reason we are doing desc is that we need the latest logs first
			tmpQuery = query + tmpQuery + " order by timestamp desc, id desc limit 100"

			// using the old structure since we can directly read it to the struct as use it.
			response := []model.SignozLogV2{}
			err := r.db.Select(ctx, &response, tmpQuery)
			if err != nil {
				zap.L().Error("Error while getting logs", zap.Error(err))
				client.Error <- err
				return
			}
			for i := len(response) - 1; i >= 0; i-- {
				client.Logs <- &response[i]
				if i == 0 {
					timestampStart = response[i].Timestamp
					idStart = response[i].ID
				}
			}
		}
	}
}

func (r *ClickHouseReader) LiveTailLogsV3(ctx context.Context, query string, timestampStart uint64, idStart string, client *model.LogsLiveTailClient) {
	if timestampStart == 0 {
		timestampStart = uint64(time.Now().UnixNano())
	} else {
		timestampStart = uint64(utils.GetEpochNanoSecs(int64(timestampStart)))
	}

	ticker := time.NewTicker(time.Duration(r.liveTailRefreshSeconds) * time.Second)
	defer ticker.Stop()
	for {
		select {
		case <-ctx.Done():
			done := true
			client.Done <- &done
			zap.L().Debug("closing go routine : " + client.Name)
			return
		case <-ticker.C:
			// get the new 100 logs as anything more older won't make sense
			tmpQuery := fmt.Sprintf("timestamp >='%d'", timestampStart)
			if idStart != "" {
				tmpQuery = fmt.Sprintf("%s AND id > '%s'", tmpQuery, idStart)
			}
			// the reason we are doing desc is that we need the latest logs first
			tmpQuery = query + tmpQuery + " order by timestamp desc, id desc limit 100"

			// using the old structure since we can directly read it to the struct as use it.
			response := []model.SignozLog{}
			err := r.db.Select(ctx, &response, tmpQuery)
			if err != nil {
				zap.L().Error("Error while getting logs", zap.Error(err))
				client.Error <- err
				return
			}
			for i := len(response) - 1; i >= 0; i-- {
				client.Logs <- &response[i]
				if i == 0 {
					timestampStart = response[i].Timestamp
					idStart = response[i].ID
				}
			}
		}
	}
}

func (r *ClickHouseReader) AddRuleStateHistory(ctx context.Context, ruleStateHistory []model.RuleStateHistory) error {
	var statement driver.Batch
	var err error

	defer func() {
		if statement != nil {
			statement.Abort()
		}
	}()

	statement, err = r.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (rule_id, rule_name, overall_state, overall_state_changed, state, state_changed, unix_milli, labels, fingerprint, value) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
		signozHistoryDBName, ruleStateHistoryTableName))

	if err != nil {
		return err
	}

	for _, history := range ruleStateHistory {
		err = statement.Append(history.RuleID, history.RuleName, history.OverallState, history.OverallStateChanged, history.State, history.StateChanged, history.UnixMilli, history.Labels, history.Fingerprint, history.Value)
		if err != nil {
			return err
		}
	}

	err = statement.Send()
	if err != nil {
		return err
	}
	return nil
}

func (r *ClickHouseReader) GetLastSavedRuleStateHistory(ctx context.Context, ruleID string) ([]model.RuleStateHistory, error) {
	query := fmt.Sprintf("SELECT * FROM %s.%s WHERE rule_id = '%s' AND state_changed = true ORDER BY unix_milli DESC LIMIT 1 BY fingerprint",
		signozHistoryDBName, ruleStateHistoryTableName, ruleID)

	history := []model.RuleStateHistory{}
	err := r.db.Select(ctx, &history, query)
	if err != nil {
		return nil, err
	}
	return history, nil
}

func (r *ClickHouseReader) ReadRuleStateHistoryByRuleID(
	ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) (*model.RuleStateTimeline, error) {

	var conditions []string

	conditions = append(conditions, fmt.Sprintf("rule_id = '%s'", ruleID))

	conditions = append(conditions, fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", params.Start, params.End))

	if params.State != "" {
		conditions = append(conditions, fmt.Sprintf("state = '%s'", params.State))
	}

	if params.Filters != nil && len(params.Filters.Items) != 0 {
		for _, item := range params.Filters.Items {
			toFormat := item.Value
			op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator))))
			if op == v3.FilterOperatorContains || op == v3.FilterOperatorNotContains {
				toFormat = fmt.Sprintf("%%%s%%", toFormat)
			}
			fmtVal := utils.ClickHouseFormattedValue(toFormat)
			switch op {
			case v3.FilterOperatorEqual:
				conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') = %s", item.Key.Key, fmtVal))
			case v3.FilterOperatorNotEqual:
				conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') != %s", item.Key.Key, fmtVal))
			case v3.FilterOperatorIn:
				conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') IN %s", item.Key.Key, fmtVal))
			case v3.FilterOperatorNotIn:
				conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') NOT IN %s", item.Key.Key, fmtVal))
			case v3.FilterOperatorLike:
				conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
			case v3.FilterOperatorNotLike:
				conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
			case v3.FilterOperatorRegex:
				conditions = append(conditions, fmt.Sprintf("match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
			case v3.FilterOperatorNotRegex:
				conditions = append(conditions, fmt.Sprintf("not match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
			case v3.FilterOperatorGreaterThan:
				conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') > %s", item.Key.Key, fmtVal))
			case v3.FilterOperatorGreaterThanOrEq:
				conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') >= %s", item.Key.Key, fmtVal))
			case v3.FilterOperatorLessThan:
				conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') < %s", item.Key.Key, fmtVal))
			case v3.FilterOperatorLessThanOrEq:
				conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') <= %s", item.Key.Key, fmtVal))
			case v3.FilterOperatorContains:
				conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
			case v3.FilterOperatorNotContains:
				conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
			case v3.FilterOperatorExists:
				conditions = append(conditions, fmt.Sprintf("has(JSONExtractKeys(labels), '%s')", item.Key.Key))
			case v3.FilterOperatorNotExists:
				conditions = append(conditions, fmt.Sprintf("not has(JSONExtractKeys(labels), '%s')", item.Key.Key))
			default:
				return nil, fmt.Errorf("unsupported filter operator")
			}
		}
	}
	whereClause := strings.Join(conditions, " AND ")

	query := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s ORDER BY unix_milli %s LIMIT %d OFFSET %d",
		signozHistoryDBName, ruleStateHistoryTableName, whereClause, params.Order, params.Limit, params.Offset)

	history := []model.RuleStateHistory{}
	zap.L().Debug("rule state history query", zap.String("query", query))
	err := r.db.Select(ctx, &history, query)
	if err != nil {
		zap.L().Error("Error while reading rule state history", zap.Error(err))
		return nil, err
	}

	var total uint64
	zap.L().Debug("rule state history total query", zap.String("query", fmt.Sprintf("SELECT count(*) FROM %s.%s WHERE %s",
		signozHistoryDBName, ruleStateHistoryTableName, whereClause)))
	err = r.db.QueryRow(ctx, fmt.Sprintf("SELECT count(*) FROM %s.%s WHERE %s",
		signozHistoryDBName, ruleStateHistoryTableName, whereClause)).Scan(&total)
	if err != nil {
		return nil, err
	}

	labelsQuery := fmt.Sprintf("SELECT DISTINCT labels FROM %s.%s WHERE rule_id = $1",
		signozHistoryDBName, ruleStateHistoryTableName)
	rows, err := r.db.Query(ctx, labelsQuery, ruleID)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	labelsMap := make(map[string][]string)
	for rows.Next() {
		var rawLabel string
		err = rows.Scan(&rawLabel)
		if err != nil {
			return nil, err
		}
		label := map[string]string{}
		err = json.Unmarshal([]byte(rawLabel), &label)
		if err != nil {
			return nil, err
		}
		for k, v := range label {
			labelsMap[k] = append(labelsMap[k], v)
		}
	}

	timeline := &model.RuleStateTimeline{
		Items:  history,
		Total:  total,
		Labels: labelsMap,
	}

	return timeline, nil
}

func (r *ClickHouseReader) ReadRuleStateHistoryTopContributorsByRuleID(
	ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) ([]model.RuleStateHistoryContributor, error) {
	query := fmt.Sprintf(`SELECT
		fingerprint,
		any(labels) as labels,
		count(*) as count
	FROM %s.%s
	WHERE rule_id = '%s' AND (state_changed = true) AND (state = '%s') AND unix_milli >= %d AND unix_milli <= %d
	GROUP BY fingerprint
	HAVING labels != '{}'
	ORDER BY count DESC`,
		signozHistoryDBName, ruleStateHistoryTableName, ruleID, model.StateFiring.String(), params.Start, params.End)

	zap.L().Debug("rule state history top contributors query", zap.String("query", query))
	contributors := []model.RuleStateHistoryContributor{}
	err := r.db.Select(ctx, &contributors, query)
	if err != nil {
		zap.L().Error("Error while reading rule state history", zap.Error(err))
		return nil, err
	}

	return contributors, nil
}

func (r *ClickHouseReader) GetOverallStateTransitions(ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) ([]model.ReleStateItem, error) {

	tmpl := `WITH firing_events AS (
    SELECT
        rule_id,
        state,
        unix_milli AS firing_time
    FROM %s.%s
    WHERE overall_state = '` + model.StateFiring.String() + `' 
      AND overall_state_changed = true
      AND rule_id IN ('%s')
	  AND unix_milli >= %d AND unix_milli <= %d
),
resolution_events AS (
    SELECT
        rule_id,
        state,
        unix_milli AS resolution_time
    FROM %s.%s
    WHERE overall_state = '` + model.StateInactive.String() + `' 
      AND overall_state_changed = true
      AND rule_id IN ('%s')
	  AND unix_milli >= %d AND unix_milli <= %d
),
matched_events AS (
    SELECT
        f.rule_id,
        f.state,
        f.firing_time,
        MIN(r.resolution_time) AS resolution_time
    FROM firing_events f
    LEFT JOIN resolution_events r
        ON f.rule_id = r.rule_id
    WHERE r.resolution_time > f.firing_time
    GROUP BY f.rule_id, f.state, f.firing_time
)
SELECT *
FROM matched_events
ORDER BY firing_time ASC;`

	query := fmt.Sprintf(tmpl,
		signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End,
		signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End)

	zap.L().Debug("overall state transitions query", zap.String("query", query))

	transitions := []model.RuleStateTransition{}
	err := r.db.Select(ctx, &transitions, query)
	if err != nil {
		return nil, err
	}

	stateItems := []model.ReleStateItem{}

	for idx, item := range transitions {
		start := item.FiringTime
		end := item.ResolutionTime
		stateItems = append(stateItems, model.ReleStateItem{
			State: item.State,
			Start: start,
			End:   end,
		})
		if idx < len(transitions)-1 {
			nextStart := transitions[idx+1].FiringTime
			if nextStart > end {
				stateItems = append(stateItems, model.ReleStateItem{
					State: model.StateInactive,
					Start: end,
					End:   nextStart,
				})
			}
		}
	}

	// fetch the most recent overall_state from the table
	var state model.AlertState
	stateQuery := fmt.Sprintf("SELECT state FROM %s.%s WHERE rule_id = '%s' AND unix_milli <= %d ORDER BY unix_milli DESC LIMIT 1",
		signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.End)
	if err := r.db.QueryRow(ctx, stateQuery).Scan(&state); err != nil {
		if err != sql.ErrNoRows {
			return nil, err
		}
		state = model.StateInactive
	}

	if len(transitions) == 0 {
		// no transitions found, it is either firing or inactive for whole time range
		stateItems = append(stateItems, model.ReleStateItem{
			State: state,
			Start: params.Start,
			End:   params.End,
		})
	} else {
		// there were some transitions, we need to add the last state at the end
		if state == model.StateInactive {
			stateItems = append(stateItems, model.ReleStateItem{
				State: model.StateInactive,
				Start: transitions[len(transitions)-1].ResolutionTime,
				End:   params.End,
			})
		} else {
			// fetch the most recent firing event from the table in the given time range
			var firingTime int64
			firingQuery := fmt.Sprintf(`
			SELECT
				unix_milli
			FROM %s.%s
			WHERE rule_id = '%s' AND overall_state_changed = true AND overall_state = '%s' AND unix_milli <= %d
			ORDER BY unix_milli DESC LIMIT 1`, signozHistoryDBName, ruleStateHistoryTableName, ruleID, model.StateFiring.String(), params.End)
			if err := r.db.QueryRow(ctx, firingQuery).Scan(&firingTime); err != nil {
				return nil, err
			}
			stateItems = append(stateItems, model.ReleStateItem{
				State: model.StateInactive,
				Start: transitions[len(transitions)-1].ResolutionTime,
				End:   firingTime,
			})
			stateItems = append(stateItems, model.ReleStateItem{
				State: model.StateFiring,
				Start: firingTime,
				End:   params.End,
			})
		}
	}
	return stateItems, nil
}

func (r *ClickHouseReader) GetAvgResolutionTime(ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) (float64, error) {

	tmpl := `
WITH firing_events AS (
    SELECT
        rule_id,
        state,
        unix_milli AS firing_time
    FROM %s.%s
    WHERE overall_state = '` + model.StateFiring.String() + `' 
      AND overall_state_changed = true
      AND rule_id IN ('%s')
	  AND unix_milli >= %d AND unix_milli <= %d
),
resolution_events AS (
    SELECT
        rule_id,
        state,
        unix_milli AS resolution_time
    FROM %s.%s
    WHERE overall_state = '` + model.StateInactive.String() + `' 
      AND overall_state_changed = true
      AND rule_id IN ('%s')
	  AND unix_milli >= %d AND unix_milli <= %d
),
matched_events AS (
    SELECT
        f.rule_id,
        f.state,
        f.firing_time,
        MIN(r.resolution_time) AS resolution_time
    FROM firing_events f
    LEFT JOIN resolution_events r
        ON f.rule_id = r.rule_id
    WHERE r.resolution_time > f.firing_time
    GROUP BY f.rule_id, f.state, f.firing_time
)
SELECT AVG(resolution_time - firing_time) / 1000 AS avg_resolution_time
FROM matched_events;
`

	query := fmt.Sprintf(tmpl,
		signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End,
		signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End)

	zap.L().Debug("avg resolution time query", zap.String("query", query))
	var avgResolutionTime float64
	err := r.db.QueryRow(ctx, query).Scan(&avgResolutionTime)
	if err != nil {
		return 0, err
	}

	return avgResolutionTime, nil
}

func (r *ClickHouseReader) GetAvgResolutionTimeByInterval(ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) (*v3.Series, error) {

	step := common.MinAllowedStepInterval(params.Start, params.End)

	tmpl := `
WITH firing_events AS (
    SELECT
        rule_id,
        state,
        unix_milli AS firing_time
    FROM %s.%s
    WHERE overall_state = '` + model.StateFiring.String() + `' 
      AND overall_state_changed = true
      AND rule_id IN ('%s')
	  AND unix_milli >= %d AND unix_milli <= %d
),
resolution_events AS (
    SELECT
        rule_id,
        state,
        unix_milli AS resolution_time
    FROM %s.%s
    WHERE overall_state = '` + model.StateInactive.String() + `' 
      AND overall_state_changed = true
      AND rule_id IN ('%s')
	  AND unix_milli >= %d AND unix_milli <= %d
),
matched_events AS (
    SELECT
        f.rule_id,
        f.state,
        f.firing_time,
        MIN(r.resolution_time) AS resolution_time
    FROM firing_events f
    LEFT JOIN resolution_events r
        ON f.rule_id = r.rule_id
    WHERE r.resolution_time > f.firing_time
    GROUP BY f.rule_id, f.state, f.firing_time
)
SELECT toStartOfInterval(toDateTime(firing_time / 1000), INTERVAL %d SECOND) AS ts, AVG(resolution_time - firing_time) / 1000 AS avg_resolution_time
FROM matched_events
GROUP BY ts
ORDER BY ts ASC;`

	query := fmt.Sprintf(tmpl,
		signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End,
		signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End, step)

	zap.L().Debug("avg resolution time by interval query", zap.String("query", query))
	result, err := r.GetTimeSeriesResultV3(ctx, query)
	if err != nil || len(result) == 0 {
		return nil, err
	}

	return result[0], nil
}

func (r *ClickHouseReader) GetTotalTriggers(ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) (uint64, error) {
	query := fmt.Sprintf("SELECT count(*) FROM %s.%s WHERE rule_id = '%s' AND (state_changed = true) AND (state = '%s') AND unix_milli >= %d AND unix_milli <= %d",
		signozHistoryDBName, ruleStateHistoryTableName, ruleID, model.StateFiring.String(), params.Start, params.End)

	var totalTriggers uint64

	err := r.db.QueryRow(ctx, query).Scan(&totalTriggers)
	if err != nil {
		return 0, err
	}

	return totalTriggers, nil
}

func (r *ClickHouseReader) GetTriggersByInterval(ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) (*v3.Series, error) {
	step := common.MinAllowedStepInterval(params.Start, params.End)

	query := fmt.Sprintf("SELECT count(*), toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts FROM %s.%s WHERE rule_id = '%s' AND (state_changed = true) AND (state = '%s') AND unix_milli >= %d AND unix_milli <= %d GROUP BY ts ORDER BY ts ASC",
		step, signozHistoryDBName, ruleStateHistoryTableName, ruleID, model.StateFiring.String(), params.Start, params.End)

	result, err := r.GetTimeSeriesResultV3(ctx, query)
	if err != nil || len(result) == 0 {
		return nil, err
	}

	return result[0], nil
}

func (r *ClickHouseReader) GetMinAndMaxTimestampForTraceID(ctx context.Context, traceID []string) (int64, int64, error) {
	var minTime, maxTime time.Time

	query := fmt.Sprintf("SELECT min(timestamp), max(timestamp) FROM %s.%s WHERE traceID IN ('%s')",
		r.TraceDB, r.SpansTable, strings.Join(traceID, "','"))

	zap.L().Debug("GetMinAndMaxTimestampForTraceID", zap.String("query", query))

	err := r.db.QueryRow(ctx, query).Scan(&minTime, &maxTime)
	if err != nil {
		zap.L().Error("Error while executing query", zap.Error(err))
		return 0, 0, err
	}

	// return current time if traceID not found
	if minTime.IsZero() || maxTime.IsZero() {
		zap.L().Debug("minTime or maxTime is zero, traceID not found")
		return time.Now().UnixNano(), time.Now().UnixNano(), nil
	}

	zap.L().Debug("GetMinAndMaxTimestampForTraceID", zap.Any("minTime", minTime), zap.Any("maxTime", maxTime))

	return minTime.UnixNano(), maxTime.UnixNano(), nil
}

func (r *ClickHouseReader) ReportQueryStartForProgressTracking(
	queryId string,
) (func(), *model.ApiError) {
	return r.queryProgressTracker.ReportQueryStarted(queryId)
}

func (r *ClickHouseReader) SubscribeToQueryProgress(
	queryId string,
) (<-chan model.QueryProgress, func(), *model.ApiError) {
	return r.queryProgressTracker.SubscribeToQueryProgress(queryId)
}

func (r *ClickHouseReader) GetAllMetricFilterAttributeKeys(ctx context.Context, req *metrics_explorer.FilterKeyRequest, skipDotNames bool) (*[]v3.AttributeKey, *model.ApiError) {
	var rows driver.Rows
	var response []v3.AttributeKey
	query := fmt.Sprintf("SELECT arrayJoin(tagKeys) AS distinctTagKey FROM (SELECT JSONExtractKeys(labels) AS tagKeys FROM %s.%s WHERE unix_milli >= $1 GROUP BY tagKeys) WHERE distinctTagKey ILIKE $2 AND distinctTagKey NOT LIKE '\\_\\_%%' GROUP BY distinctTagKey", signozMetricDBName, signozTSTableNameV41Day)
	if req.Limit != 0 {
		query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
	}
	valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
	rows, err := r.db.Query(valueCtx, query, common.PastDayRoundOff(), fmt.Sprintf("%%%s%%", req.SearchText)) //only showing past day data
	if err != nil {
		zap.L().Error("Error while executing query", zap.Error(err))
		return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}

	var attributeKey string
	for rows.Next() {
		if err := rows.Scan(&attributeKey); err != nil {
			return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
		}
		if skipDotNames && strings.Contains(attributeKey, ".") {
			continue
		}
		key := v3.AttributeKey{
			Key:      attributeKey,
			DataType: v3.AttributeKeyDataTypeString, // https://github.com/OpenObservability/OpenMetrics/blob/main/proto/openmetrics_data_model.proto#L64-L72.
			Type:     v3.AttributeKeyTypeTag,
			IsColumn: false,
		}
		response = append(response, key)
	}
	if err := rows.Err(); err != nil {
		return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}
	return &response, nil
}

func (r *ClickHouseReader) GetAllMetricFilterAttributeValues(ctx context.Context, req *metrics_explorer.FilterValueRequest) ([]string, *model.ApiError) {
	var query string
	var err error
	var rows driver.Rows
	var attributeValues []string

	query = fmt.Sprintf("SELECT JSONExtractString(labels, $1) AS tagValue FROM %s.%s WHERE JSONExtractString(labels, $2) ILIKE $3 AND unix_milli >= $4 GROUP BY tagValue", signozMetricDBName, signozTSTableNameV41Day)
	if req.Limit != 0 {
		query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
	}
	valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
	rows, err = r.db.Query(valueCtx, query, req.FilterKey, req.FilterKey, fmt.Sprintf("%%%s%%", req.SearchText), common.PastDayRoundOff()) //only showing past day data

	if err != nil {
		zap.L().Error("Error while executing query", zap.Error(err))
		return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}
	defer rows.Close()

	var atrributeValue string
	for rows.Next() {
		if err := rows.Scan(&atrributeValue); err != nil {
			return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
		}
		attributeValues = append(attributeValues, atrributeValue)
	}
	if err := rows.Err(); err != nil {
		return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}
	return attributeValues, nil
}

func (r *ClickHouseReader) GetAllMetricFilterUnits(ctx context.Context, req *metrics_explorer.FilterValueRequest) ([]string, *model.ApiError) {
	var rows driver.Rows
	var response []string
	query := fmt.Sprintf("SELECT DISTINCT unit FROM %s.%s WHERE unit ILIKE $1 AND unit IS NOT NULL ORDER BY unit", signozMetricDBName, signozTSTableNameV41Day)
	if req.Limit != 0 {
		query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
	}

	valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
	rows, err := r.db.Query(valueCtx, query, fmt.Sprintf("%%%s%%", req.SearchText))
	if err != nil {
		zap.L().Error("Error while executing query", zap.Error(err))
		return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}

	var attributeKey string
	for rows.Next() {
		if err := rows.Scan(&attributeKey); err != nil {
			return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
		}
		response = append(response, attributeKey)
	}
	if err := rows.Err(); err != nil {
		return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}
	return response, nil
}
func (r *ClickHouseReader) GetAllMetricFilterTypes(ctx context.Context, req *metrics_explorer.FilterValueRequest) ([]string, *model.ApiError) {
	var rows driver.Rows
	var response []string
	query := fmt.Sprintf("SELECT DISTINCT type FROM %s.%s WHERE type ILIKE $1 AND type IS NOT NULL ORDER BY type", signozMetricDBName, signozTSTableNameV41Day)
	if req.Limit != 0 {
		query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
	}
	valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
	rows, err := r.db.Query(valueCtx, query, fmt.Sprintf("%%%s%%", req.SearchText))
	if err != nil {
		zap.L().Error("Error while executing query", zap.Error(err))
		return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}

	var attributeKey string
	for rows.Next() {
		if err := rows.Scan(&attributeKey); err != nil {
			return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
		}
		response = append(response, attributeKey)
	}
	if err := rows.Err(); err != nil {
		return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}
	return response, nil
}

func (r *ClickHouseReader) GetMetricsDataPoints(ctx context.Context, metricName string) (uint64, *model.ApiError) {
	query := fmt.Sprintf(`SELECT 
    sum(count) as data_points
FROM %s.%s
WHERE metric_name = ?
`, signozMetricDBName, constants.SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME)
	var dataPoints uint64
	valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
	err := r.db.QueryRow(valueCtx, query, metricName).Scan(&dataPoints)
	if err != nil {
		return 0, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}
	return dataPoints, nil // Convert to uint64 before returning
}

func (r *ClickHouseReader) GetMetricsLastReceived(ctx context.Context, metricName string) (int64, *model.ApiError) {
	query := fmt.Sprintf(`SELECT 
    MAX(unix_milli) AS last_received_time
FROM %s.%s
WHERE metric_name = ?
`, signozMetricDBName, signozSamplesAgg30mLocalTableName)
	var lastReceived int64
	valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
	err := r.db.QueryRow(valueCtx, query, metricName).Scan(&lastReceived)
	if err != nil {
		return 0, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}
	query = fmt.Sprintf(`SELECT 
    MAX(unix_milli) AS last_received_time
FROM %s.%s
WHERE metric_name = ? and unix_milli > ?
`, signozMetricDBName, signozSampleTableName)
	var finalLastReceived int64
	err = r.db.QueryRow(valueCtx, query, metricName, lastReceived).Scan(&finalLastReceived)
	if err != nil {
		return 0, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}
	return finalLastReceived, nil // Convert to uint64 before returning
}

func (r *ClickHouseReader) GetTotalTimeSeriesForMetricName(ctx context.Context, metricName string) (uint64, *model.ApiError) {
	query := fmt.Sprintf(`SELECT 
    uniq(fingerprint) AS timeSeriesCount
FROM %s.%s
WHERE metric_name = ?;`, signozMetricDBName, signozTSTableNameV41Week)
	var timeSeriesCount uint64
	valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
	err := r.db.QueryRow(valueCtx, query, metricName).Scan(&timeSeriesCount)
	if err != nil {
		return 0, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}
	return timeSeriesCount, nil
}

func (r *ClickHouseReader) GetAttributesForMetricName(ctx context.Context, metricName string, start, end *int64, filters *v3.FilterSet) (*[]metrics_explorer.Attribute, *model.ApiError) {
	whereClause := ""
	if filters != nil {
		conditions, _ := utils.BuildFilterConditions(filters, "t")
		if conditions != nil {
			whereClause = "AND " + strings.Join(conditions, " AND ")
		}
	}
	const baseQueryTemplate = `
SELECT 
    kv.1 AS key,
    arrayMap(x -> trim(BOTH '"' FROM x), groupUniqArray(1000)(kv.2)) AS values,
    length(groupUniqArray(10000)(kv.2)) AS valueCount
FROM %s.%s
ARRAY JOIN arrayFilter(x -> NOT startsWith(x.1, '__'), JSONExtractKeysAndValuesRaw(labels)) AS kv
WHERE metric_name = ? AND __normalized=true %s`

	var args []interface{}
	args = append(args, metricName)
	tableName := signozTSTableNameV41Week

	if start != nil && end != nil {
		st, en, tsTable, _ := utils.WhichTSTableToUse(*start, *end)
		*start, *end, tableName = st, en, tsTable
		args = append(args, *start, *end)
	} else if start == nil && end == nil {
		tableName = signozTSTableNameV41Week
	}

	query := fmt.Sprintf(baseQueryTemplate, signozMetricDBName, tableName, whereClause)

	if start != nil && end != nil {
		query += " AND unix_milli BETWEEN ? AND ?"
	}

	query += "\nGROUP BY kv.1\nORDER BY valueCount DESC;"

	valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
	rows, err := r.db.Query(valueCtx, query, args...)
	if err != nil {
		return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}
	defer rows.Close()

	var attributesList []metrics_explorer.Attribute
	for rows.Next() {
		var attr metrics_explorer.Attribute
		if err := rows.Scan(&attr.Key, &attr.Value, &attr.ValueCount); err != nil {
			return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
		}
		attributesList = append(attributesList, attr)
	}

	if err := rows.Err(); err != nil {
		return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}

	return &attributesList, nil
}

func (r *ClickHouseReader) GetActiveTimeSeriesForMetricName(ctx context.Context, metricName string, duration time.Duration) (uint64, *model.ApiError) {
	milli := time.Now().Add(-duration).UnixMilli()
	query := fmt.Sprintf("SELECT uniq(fingerprint) FROM %s.%s WHERE metric_name = '%s' and unix_milli >= ?", signozMetricDBName, signozTSTableNameV4, metricName)
	var timeSeries uint64
	// Using QueryRow instead of Select since we're only expecting a single value
	valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
	err := r.db.QueryRow(valueCtx, query, milli).Scan(&timeSeries)
	if err != nil {
		return 0, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}
	return timeSeries, nil
}

func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, orgID valuer.UUID, req *metrics_explorer.SummaryListMetricsRequest) (*metrics_explorer.SummaryListMetricsResponse, *model.ApiError) {
	var args []interface{}

	// Build filter conditions (if any)
	conditions, _ := utils.BuildFilterConditions(&req.Filters, "t")
	whereClause := ""
	if conditions != nil {
		whereClause = "AND " + strings.Join(conditions, " AND ")
	}

	firstQueryLimit := req.Limit
	samplesOrder := false
	var orderByClauseFirstQuery string
	if req.OrderBy.ColumnName == "samples" {
		samplesOrder = true
		orderByClauseFirstQuery = fmt.Sprintf("ORDER BY timeseries %s", req.OrderBy.Order)
		if req.Limit < 50 {
			firstQueryLimit = 50
		}
	} else {
		orderByClauseFirstQuery = fmt.Sprintf("ORDER BY %s %s", req.OrderBy.ColumnName, req.OrderBy.Order)
	}

	// Determine which tables to use
	start, end, tsTable, localTsTable := utils.WhichTSTableToUse(req.Start, req.End)
	sampleTable, countExp := utils.WhichSampleTableToUse(req.Start, req.End)

	metricsQuery := fmt.Sprintf(
		`SELECT 
		    t.metric_name AS metric_name,
		    ANY_VALUE(t.description) AS description,
		    ANY_VALUE(t.type) AS metric_type,
		    ANY_VALUE(t.unit) AS metric_unit,
		    uniq(t.fingerprint) AS timeseries,
			uniq(metric_name) OVER() AS total
		FROM %s.%s AS t
		WHERE unix_milli BETWEEN ? AND ?
		AND NOT startsWith(metric_name, 'signoz_')
		AND __normalized = true
		%s
		GROUP BY t.metric_name
		%s
		LIMIT %d OFFSET %d;`,
		signozMetricDBName, tsTable, whereClause, orderByClauseFirstQuery, firstQueryLimit, req.Offset)

	args = append(args, start, end)
	valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
	begin := time.Now()
	rows, err := r.db.Query(valueCtx, metricsQuery, args...)
	queryDuration := time.Since(begin)
	zap.L().Info("Time taken to execute metrics query to fetch metrics with high time series", zap.String("query", metricsQuery), zap.Any("args", args), zap.Duration("duration", queryDuration))
	if err != nil {
		zap.L().Error("Error executing metrics query", zap.Error(err))
		return &metrics_explorer.SummaryListMetricsResponse{}, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}
	defer rows.Close()

	var response metrics_explorer.SummaryListMetricsResponse
	var metricNames []string

	for rows.Next() {
		var metric metrics_explorer.MetricDetail
		if err := rows.Scan(&metric.MetricName, &metric.Description, &metric.MetricType, &metric.MetricUnit, &metric.TimeSeries, &response.Total); err != nil {
			zap.L().Error("Error scanning metric row", zap.Error(err))
			return &response, &model.ApiError{Typ: "ClickHouseError", Err: err}
		}
		metricNames = append(metricNames, metric.MetricName)
		response.Metrics = append(response.Metrics, metric)
	}
	if err := rows.Err(); err != nil {
		zap.L().Error("Error iterating over metric rows", zap.Error(err))
		return &response, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}
	// If no metrics were found, return early.
	if len(metricNames) == 0 {
		return &response, nil
	}

	// Build a comma-separated list of quoted metric names.
	metricsList := "'" + strings.Join(metricNames, "', '") + "'"
	// If samples are being sorted by datapoints, update the ORDER clause.
	if samplesOrder {
		orderByClauseFirstQuery = fmt.Sprintf("ORDER BY s.samples %s", req.OrderBy.Order)
	} else {
		orderByClauseFirstQuery = ""
	}
	args = make([]interface{}, 0)
	var sampleQuery string
	var sb strings.Builder

	if whereClause != "" {
		sb.WriteString(fmt.Sprintf(
			`SELECT 
				s.samples,
				s.metric_name
			FROM (
				SELECT 
					dm.metric_name,
					%s AS samples
				FROM %s.%s AS dm
				WHERE dm.metric_name IN (%s)
				AND dm.fingerprint IN (
					SELECT fingerprint
					FROM %s.%s
					WHERE metric_name IN (%s)
					AND __normalized = true
					AND unix_milli BETWEEN ? AND ?
					%s
					GROUP BY fingerprint
				)
				AND dm.unix_milli BETWEEN ? AND ?
				GROUP BY dm.metric_name
			) AS s `,
			countExp,
			signozMetricDBName, sampleTable,
			metricsList,
			signozMetricDBName, localTsTable,
			metricsList,
			whereClause,
		))
		args = append(args, start, end)
		args = append(args, req.Start, req.End)
	} else {
		// If no filters, it is a simpler query.
		sb.WriteString(fmt.Sprintf(
			`SELECT 
        s.samples,
        s.metric_name
    FROM (
        SELECT 
            metric_name,
            %s AS samples
        FROM %s.%s
        WHERE metric_name IN (%s)
        AND unix_milli BETWEEN ? AND ?
        GROUP BY metric_name
    ) AS s `,
			countExp,
			signozMetricDBName, sampleTable,
			metricsList))
		args = append(args, req.Start, req.End)
	}

	// Append ORDER BY clause if provided.
	if orderByClauseFirstQuery != "" {
		sb.WriteString(orderByClauseFirstQuery + " ")
	}

	// Append LIMIT clause.
	sb.WriteString(fmt.Sprintf("LIMIT %d;", req.Limit))
	sampleQuery = sb.String()
	begin = time.Now()
	rows, err = r.db.Query(valueCtx, sampleQuery, args...)
	queryDuration = time.Since(begin)
	zap.L().Info("Time taken to execute list summary query", zap.String("query", sampleQuery), zap.Any("args", args), zap.Duration("duration", queryDuration))
	if err != nil {
		zap.L().Error("Error executing samples query", zap.Error(err))
		return &response, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}
	defer rows.Close()

	samplesMap := make(map[string]uint64)

	for rows.Next() {
		var samples uint64
		var metricName string
		if err := rows.Scan(&samples, &metricName); err != nil {
			zap.L().Error("Error scanning sample row", zap.Error(err))
			return &response, &model.ApiError{Typ: "ClickHouseError", Err: err}
		}
		samplesMap[metricName] = samples
	}
	if err := rows.Err(); err != nil {
		zap.L().Error("Error iterating over sample rows", zap.Error(err))
		return &response, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}

	//get updated metrics data
	batch, apiError := r.GetUpdatedMetricsMetadata(ctx, orgID, metricNames...)
	if apiError != nil {
		zap.L().Error("Error in getting metrics cached metadata", zap.Error(apiError))
	}

	var filteredMetrics []metrics_explorer.MetricDetail
	for i := range response.Metrics {
		if updatedMetrics, exists := batch[response.Metrics[i].MetricName]; exists {
			response.Metrics[i].MetricType = string(updatedMetrics.MetricType)
			if updatedMetrics.Unit != "" {
				response.Metrics[i].MetricUnit = updatedMetrics.Unit
			}
			if updatedMetrics.Description != "" {
				response.Metrics[i].Description = updatedMetrics.Description
			}
		}
		if samples, exists := samplesMap[response.Metrics[i].MetricName]; exists {
			response.Metrics[i].Samples = samples
			filteredMetrics = append(filteredMetrics, response.Metrics[i])
		}
	}
	response.Metrics = filteredMetrics

	// If ordering by samples, sort in-memory.
	if samplesOrder {
		sort.Slice(response.Metrics, func(i, j int) bool {
			return response.Metrics[i].Samples > response.Metrics[j].Samples
		})
	}

	return &response, nil
}

func (r *ClickHouseReader) GetMetricsTimeSeriesPercentage(ctx context.Context, req *metrics_explorer.TreeMapMetricsRequest) (*[]metrics_explorer.TreeMapResponseItem, *model.ApiError) {
	var args []interface{}

	// Build filters dynamically
	conditions, _ := utils.BuildFilterConditions(&req.Filters, "")
	whereClause := ""
	if len(conditions) > 0 {
		whereClause = "AND " + strings.Join(conditions, " AND ")
	}
	start, end, tsTable, _ := utils.WhichTSTableToUse(req.Start, req.End)

	// Construct the query without backticks
	query := fmt.Sprintf(`
		SELECT 
			metric_name,
			total_value,
			(total_value * 100.0 / total_time_series) AS percentage
		FROM (
			SELECT 
					metric_name,
					uniq(fingerprint) AS total_value,
					(SELECT uniq(fingerprint) 
					 FROM %s.%s 
					 WHERE unix_milli BETWEEN ? AND ? AND __normalized = true) AS total_time_series
				FROM %s.%s
				WHERE unix_milli BETWEEN ? AND ? AND NOT startsWith(metric_name, 'signoz_') AND __normalized = true %s
				GROUP BY metric_name
			)
			ORDER BY percentage DESC
			LIMIT %d;`,
		signozMetricDBName,
		tsTable,
		signozMetricDBName,
		tsTable,
		whereClause,
		req.Limit,
	)

	args = append(args,
		start, end, // For total_time_series subquery
		start, end, // For main query
	)

	valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
	begin := time.Now()
	rows, err := r.db.Query(valueCtx, query, args...)
	duration := time.Since(begin)
	zap.L().Info("Time taken to execute time series percentage query", zap.String("query", query), zap.Any("args", args), zap.Duration("duration", duration))
	if err != nil {
		zap.L().Error("Error executing time series percentage query", zap.Error(err), zap.String("query", query))
		return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}
	defer rows.Close()

	var treemap []metrics_explorer.TreeMapResponseItem
	for rows.Next() {
		var item metrics_explorer.TreeMapResponseItem
		if err := rows.Scan(&item.MetricName, &item.TotalValue, &item.Percentage); err != nil {
			zap.L().Error("Error scanning row", zap.Error(err))
			return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
		}
		treemap = append(treemap, item)
	}

	if err := rows.Err(); err != nil {
		zap.L().Error("Error iterating over rows", zap.Error(err))
		return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}

	return &treemap, nil
}

func (r *ClickHouseReader) GetMetricsSamplesPercentage(ctx context.Context, req *metrics_explorer.TreeMapMetricsRequest) (*[]metrics_explorer.TreeMapResponseItem, *model.ApiError) {

	conditions, _ := utils.BuildFilterConditions(&req.Filters, "ts")
	whereClause := ""
	if conditions != nil {
		whereClause = "AND " + strings.Join(conditions, " AND ")
	}

	// Determine time range and tables to use
	start, end, tsTable, localTsTable := utils.WhichTSTableToUse(req.Start, req.End)
	sampleTable, countExp := utils.WhichSampleTableToUse(req.Start, req.End)

	queryLimit := 50 + req.Limit
	metricsQuery := fmt.Sprintf(
		`SELECT 
		    ts.metric_name AS metric_name,
		    uniq(ts.fingerprint) AS timeSeries
		FROM %s.%s AS ts
		WHERE NOT startsWith(ts.metric_name, 'signoz_')
		AND __normalized = true
		AND unix_milli BETWEEN ? AND ?
		%s
		GROUP BY ts.metric_name
		ORDER BY timeSeries DESC
		LIMIT %d;`,
		signozMetricDBName, tsTable, whereClause, queryLimit,
	)

	valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
	begin := time.Now()
	rows, err := r.db.Query(valueCtx, metricsQuery, start, end)
	duration := time.Since(begin)
	zap.L().Info("Time taken to execute samples percentage metric name query to reduce search space", zap.String("query", metricsQuery), zap.Any("start", start), zap.Any("end", end), zap.Duration("duration", duration))
	if err != nil {
		zap.L().Error("Error executing samples percentage query", zap.Error(err))
		return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}
	defer rows.Close()

	// Process the query results
	var metricNames []string
	for rows.Next() {
		var metricName string
		var timeSeries uint64
		if err := rows.Scan(&metricName, &timeSeries); err != nil {
			zap.L().Error("Error scanning metric row", zap.Error(err))
			return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
		}
		metricNames = append(metricNames, metricName)
	}
	if err := rows.Err(); err != nil {
		zap.L().Error("Error iterating over metric rows", zap.Error(err))
		return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}

	// If no metrics found, return early
	if len(metricNames) == 0 {
		return nil, nil
	}

	// Format metric names for query
	metricsList := "'" + strings.Join(metricNames, "', '") + "'"

	// Build optimized query with JOIN but `unix_milli` filter only on the sample table
	var sb strings.Builder
	sb.WriteString(fmt.Sprintf(
		`WITH TotalSamples AS (
			SELECT %s AS total_samples
			FROM %s.%s
			WHERE unix_milli BETWEEN ? AND ?
		)
		SELECT 
			s.samples,
			s.metric_name,
			COALESCE((s.samples * 100.0 / t.total_samples), 0) AS percentage
		FROM 
		(
			SELECT 
				dm.metric_name,
				%s AS samples
			FROM %s.%s AS dm`,
		countExp, signozMetricDBName, sampleTable, // Total samples
		countExp, signozMetricDBName, sampleTable, // Inner select samples
	))

	var args []interface{}
	args = append(args,
		req.Start, req.End, // For total_samples subquery
	)

	// Apply `unix_milli` filter **only** on the sample table (`dm`)
	sb.WriteString(` WHERE dm.unix_milli BETWEEN ? AND ?`)
	args = append(args, req.Start, req.End)

	// Use JOIN instead of IN (subquery) when additional filters exist
	if whereClause != "" {
		sb.WriteString(fmt.Sprintf(
			` AND dm.fingerprint IN (
				SELECT ts.fingerprint 
				FROM %s.%s AS ts
				WHERE ts.metric_name IN (%s)
				AND unix_milli BETWEEN ? AND ?
				AND __normalized = true
				%s
				GROUP BY ts.fingerprint
			)`,
			signozMetricDBName, localTsTable, metricsList, whereClause,
		))
		args = append(args, start, end)
	}

	// Apply metric filtering after all conditions
	sb.WriteString(fmt.Sprintf(
		` AND dm.metric_name IN (%s)
			GROUP BY dm.metric_name
		) AS s
		JOIN TotalSamples t ON 1 = 1
		ORDER BY percentage DESC
		LIMIT ?;`,
		metricsList,
	))
	args = append(args, req.Limit)
	sampleQuery := sb.String()

	// Add start and end time to args (only for sample table)

	begin = time.Now()
	// Execute the sample percentage query
	rows, err = r.db.Query(valueCtx, sampleQuery, args...)
	duration = time.Since(begin)
	zap.L().Info("Time taken to execute samples percentage query", zap.String("query", sampleQuery), zap.Any("args", args), zap.Duration("duration", duration))
	if err != nil {
		zap.L().Error("Error executing samples query", zap.Error(err))
		return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}
	defer rows.Close()

	// Process the results into a response slice
	var treemap []metrics_explorer.TreeMapResponseItem
	for rows.Next() {
		var item metrics_explorer.TreeMapResponseItem
		if err := rows.Scan(&item.TotalValue, &item.MetricName, &item.Percentage); err != nil {
			zap.L().Error("Error scanning row", zap.Error(err))
			return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
		}
		treemap = append(treemap, item)
	}
	if err := rows.Err(); err != nil {
		zap.L().Error("Error iterating over sample rows", zap.Error(err))
		return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}

	return &treemap, nil
}

func (r *ClickHouseReader) GetNameSimilarity(ctx context.Context, req *metrics_explorer.RelatedMetricsRequest) (map[string]metrics_explorer.RelatedMetricsScore, *model.ApiError) {
	start, end, tsTable, _ := utils.WhichTSTableToUse(req.Start, req.End)

	query := fmt.Sprintf(`
		SELECT 
			metric_name,
			any(type) as type,
		    any(temporality) as temporality,
		    any(is_monotonic) as monotonic,
			1 - (levenshteinDistance(?, metric_name) / greatest(NULLIF(length(?), 0), NULLIF(length(metric_name), 0))) AS name_similarity
		FROM %s.%s
		WHERE metric_name != ?
		  AND unix_milli BETWEEN ? AND ?
		 AND NOT startsWith(metric_name, 'signoz_')
		AND __normalized = true
		GROUP BY metric_name
		ORDER BY name_similarity DESC
		LIMIT 30;`,
		signozMetricDBName, tsTable)

	valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
	rows, err := r.db.Query(valueCtx, query, req.CurrentMetricName, req.CurrentMetricName, req.CurrentMetricName, start, end)
	if err != nil {
		return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}
	defer rows.Close()

	result := make(map[string]metrics_explorer.RelatedMetricsScore)
	for rows.Next() {
		var metric string
		var sim float64
		var metricType v3.MetricType
		var temporality v3.Temporality
		var isMonotonic bool
		if err := rows.Scan(&metric, &metricType, &temporality, &isMonotonic, &sim); err != nil {
			return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
		}
		result[metric] = metrics_explorer.RelatedMetricsScore{
			NameSimilarity: sim,
			MetricType:     metricType,
			Temporality:    temporality,
			IsMonotonic:    isMonotonic,
		}
	}

	return result, nil
}

func (r *ClickHouseReader) GetAttributeSimilarity(ctx context.Context, req *metrics_explorer.RelatedMetricsRequest) (map[string]metrics_explorer.RelatedMetricsScore, *model.ApiError) {
	start, end, tsTable, _ := utils.WhichTSTableToUse(req.Start, req.End)

	// Get target labels
	extractedLabelsQuery := fmt.Sprintf(`
		SELECT 
			kv.1 AS label_key,
			topK(10)(JSONExtractString(kv.2)) AS label_values
		FROM %s.%s
		ARRAY JOIN JSONExtractKeysAndValuesRaw(labels) AS kv
		WHERE metric_name = ?
		  AND unix_milli between ? and ?
		  AND NOT startsWith(kv.1, '__')
		AND NOT startsWith(metric_name, 'signoz_')
		AND __normalized = true
		GROUP BY label_key
		LIMIT 50`, signozMetricDBName, tsTable)

	valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
	rows, err := r.db.Query(valueCtx, extractedLabelsQuery, req.CurrentMetricName, start, end)
	if err != nil {
		return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}
	defer rows.Close()

	var targetKeys []string
	var targetValues []string
	for rows.Next() {
		var key string
		var value []string
		if err := rows.Scan(&key, &value); err != nil {
			return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
		}
		targetKeys = append(targetKeys, key)
		targetValues = append(targetValues, value...)
	}

	targetKeysList := "'" + strings.Join(targetKeys, "', '") + "'"
	targetValuesList := "'" + strings.Join(targetValues, "', '") + "'"

	var priorityList []string
	for _, f := range req.Filters.Items {
		if f.Operator == v3.FilterOperatorEqual {
			priorityList = append(priorityList, fmt.Sprintf("tuple('%s', '%s')", f.Key.Key, f.Value))
		}
	}
	priorityListString := strings.Join(priorityList, ", ")

	candidateLabelsQuery := fmt.Sprintf(`
		WITH 
			arrayDistinct([%s]) AS filter_keys,     
			arrayDistinct([%s]) AS filter_values,
			[%s] AS priority_pairs_input,
			%d AS priority_multiplier
		SELECT 
			metric_name,
			any(type) as type,
			any(temporality) as temporality,
			any(is_monotonic) as monotonic,
			SUM(
				arraySum(
					kv -> if(has(filter_keys, kv.1) AND has(filter_values, kv.2), 1, 0),
					JSONExtractKeysAndValues(labels, 'String')
				)
			)::UInt64 AS raw_match_count,
			SUM(
				arraySum(
					kv ->
						if(
							arrayExists(pr -> pr.1 = kv.1 AND pr.2 = kv.2, priority_pairs_input),
							priority_multiplier,
							0
						),
					JSONExtractKeysAndValues(labels, 'String')
				)
			)::UInt64 AS weighted_match_count,
		toJSONString(
			arrayDistinct(
				arrayFlatten(
					groupArray(
						arrayFilter(
							kv -> arrayExists(pr -> pr.1 = kv.1 AND pr.2 = kv.2, priority_pairs_input),
							JSONExtractKeysAndValues(labels, 'String')
						)
					)
				)
			)
		) AS priority_pairs
		FROM %s.%s
		WHERE rand() %% 100 < 10
		AND unix_milli between ? and ?
		AND NOT startsWith(metric_name, 'signoz_')
		AND __normalized = true
		GROUP BY metric_name
		ORDER BY weighted_match_count DESC, raw_match_count DESC
		LIMIT 30
		`,
		targetKeysList, targetValuesList, priorityListString, 2,
		signozMetricDBName, tsTable)

	rows, err = r.db.Query(valueCtx, candidateLabelsQuery, start, end)
	if err != nil {
		return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}
	defer rows.Close()

	result := make(map[string]metrics_explorer.RelatedMetricsScore)
	attributeMap := make(map[string]uint64)

	for rows.Next() {
		var metric string
		var metricType v3.MetricType
		var temporality v3.Temporality
		var isMonotonic bool
		var weightedMatchCount, rawMatchCount uint64
		var priorityPairsJSON string

		if err := rows.Scan(&metric, &metricType, &temporality, &isMonotonic, &rawMatchCount, &weightedMatchCount, &priorityPairsJSON); err != nil {
			return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
		}

		attributeMap[metric] = weightedMatchCount + (rawMatchCount)/10
		var priorityPairs [][]string
		if err := json.Unmarshal([]byte(priorityPairsJSON), &priorityPairs); err != nil {
			priorityPairs = [][]string{}
		}

		result[metric] = metrics_explorer.RelatedMetricsScore{
			AttributeSimilarity: float64(attributeMap[metric]), // Will be normalized later
			Filters:             priorityPairs,
			MetricType:          metricType,
			Temporality:         temporality,
			IsMonotonic:         isMonotonic,
		}
	}

	if err := rows.Err(); err != nil {
		return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}

	// Normalize the attribute similarity scores
	normalizeMap := utils.NormalizeMap(attributeMap)
	for metric := range result {
		if score, exists := normalizeMap[metric]; exists {
			metricScore := result[metric]
			metricScore.AttributeSimilarity = score
			result[metric] = metricScore
		}
	}

	return result, nil
}

func (r *ClickHouseReader) GetMetricsAllResourceAttributes(ctx context.Context, start int64, end int64) (map[string]uint64, *model.ApiError) {
	start, end, attTable, _ := utils.WhichAttributesTableToUse(start, end)
	query := fmt.Sprintf(`SELECT 
    key, 
    count(distinct value) AS distinct_value_count
FROM (
    SELECT key, value
    FROM %s.%s
    ARRAY JOIN 
        arrayConcat(mapKeys(resource_attributes)) AS key,
        arrayConcat(mapValues(resource_attributes)) AS value
    WHERE unix_milli between ? and ?
) 
GROUP BY key
ORDER BY distinct_value_count DESC;`, signozMetadataDbName, attTable)
	valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
	rows, err := r.db.Query(valueCtx, query, start, end)
	if err != nil {
		return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}
	attributes := make(map[string]uint64)
	for rows.Next() {
		var attrs string
		var uniqCount uint64

		if err := rows.Scan(&attrs, &uniqCount); err != nil {
			return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
		}
		attributes[attrs] = uniqCount
	}
	if err := rows.Err(); err != nil {
		return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}
	return attributes, nil
}

func (r *ClickHouseReader) GetInspectMetrics(ctx context.Context, req *metrics_explorer.InspectMetricsRequest, fingerprints []string) (*metrics_explorer.InspectMetricsResponse, *model.ApiError) {
	start, end, _, localTsTable := utils.WhichTSTableToUse(req.Start, req.End)
	fingerprintsString := strings.Join(fingerprints, ",")
	query := fmt.Sprintf(`SELECT
                fingerprint,
                labels,
                unix_milli,
                value as per_series_value
        FROM
                signoz_metrics.distributed_samples_v4
        INNER JOIN (
                SELECT DISTINCT
                        fingerprint,
                        labels
                FROM
                        %s.%s
                WHERE
                        fingerprint in (%s)
                        AND unix_milli >= ?
                        AND unix_milli < ?) as filtered_time_series
                USING fingerprint
        WHERE
                metric_name  = ?
                AND unix_milli >= ?
                AND unix_milli < ?
                ORDER BY fingerprint DESC, unix_milli DESC`, signozMetricDBName, localTsTable, fingerprintsString)
	valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
	rows, err := r.db.Query(valueCtx, query, start, end, req.MetricName, start, end)
	if err != nil {
		return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}
	defer rows.Close()

	seriesMap := make(map[uint64]*v3.Series)

	for rows.Next() {
		var fingerprint uint64
		var labelsJSON string
		var unixMilli int64
		var perSeriesValue float64

		if err := rows.Scan(&fingerprint, &labelsJSON, &unixMilli, &perSeriesValue); err != nil {
			return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
		}

		var labelsMap map[string]string
		if err := json.Unmarshal([]byte(labelsJSON), &labelsMap); err != nil {
			return nil, &model.ApiError{Typ: "JsonUnmarshalError", Err: err}
		}

		// Filter out keys starting with "__"
		filteredLabelsMap := make(map[string]string)
		for k, v := range labelsMap {
			if !strings.HasPrefix(k, "__") {
				filteredLabelsMap[k] = v
			}
		}

		var labelsArray []map[string]string
		for k, v := range filteredLabelsMap {
			labelsArray = append(labelsArray, map[string]string{k: v})
		}

		// Check if we already have a Series for this fingerprint.
		series, exists := seriesMap[fingerprint]
		if !exists {
			series = &v3.Series{
				Labels:      filteredLabelsMap,
				LabelsArray: labelsArray,
				Points:      []v3.Point{},
			}
			seriesMap[fingerprint] = series
		}

		series.Points = append(series.Points, v3.Point{
			Timestamp: unixMilli,
			Value:     perSeriesValue,
		})
	}

	if err = rows.Err(); err != nil {
		return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
	}

	var seriesList []v3.Series
	for _, s := range seriesMap {
		seriesList = append(seriesList, *s)
	}

	return &metrics_explorer.InspectMetricsResponse{
		Series: &seriesList,
	}, nil
}

func (r *ClickHouseReader) GetInspectMetricsFingerprints(ctx context.Context, attributes []string, req *metrics_explorer.InspectMetricsRequest) ([]string, *model.ApiError) {
	// Build dynamic key selections and JSON extracts
	var jsonExtracts []string
	var groupBys []string

	for i, attr := range attributes {
		keyAlias := fmt.Sprintf("key%d", i+1)
		jsonExtracts = append(jsonExtracts, fmt.Sprintf("JSONExtractString(labels, '%s') AS %s", attr, keyAlias))
		groupBys = append(groupBys, keyAlias)
	}

	conditions, _ := utils.BuildFilterConditions(&req.Filters, "")
	whereClause := ""
	if len(conditions) > 0 {
		whereClause = "AND " + strings.Join(conditions, " AND ")
	}

	start, end, tsTable, _ := utils.WhichTSTableToUse(req.Start, req.End)
	query := fmt.Sprintf(`
        SELECT 
    arrayDistinct(groupArray(toString(fingerprint))) AS fingerprints
FROM
(
    SELECT 
        metric_name, labels, fingerprint,
        %s
    FROM %s.%s
    WHERE metric_name = ?
      AND unix_milli BETWEEN ? AND ?
    %s
)
GROUP BY %s
ORDER BY length(fingerprints) DESC, rand()
LIMIT 40`, // added rand to get diff value every time we run this query
		strings.Join(jsonExtracts, ", "),
		signozMetricDBName, tsTable,
		whereClause,
		strings.Join(groupBys, ", "))
	valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
	rows, err := r.db.Query(valueCtx, query,
		req.MetricName,
		start,
		end,
	)
	if err != nil {
		return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
	}
	defer rows.Close()

	var fingerprints []string
	for rows.Next() {
		// Create dynamic scanning based on number of attributes
		var batch []string

		if err := rows.Scan(&batch); err != nil {
			return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
		}

		remaining := 40 - len(fingerprints)
		if remaining <= 0 {
			break
		}

		// if this batch would overshoot, only take as many as we need
		if len(batch) > remaining {
			fingerprints = append(fingerprints, batch[:remaining]...)
			break
		}

		// otherwise take the whole batch and keep going
		fingerprints = append(fingerprints, batch...)

	}

	if err := rows.Err(); err != nil {
		return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
	}

	return fingerprints, nil
}

func (r *ClickHouseReader) DeleteMetricsMetadata(ctx context.Context, orgID valuer.UUID, metricName string) *model.ApiError {
	delQuery := fmt.Sprintf(`ALTER TABLE %s.%s DELETE WHERE metric_name = ?;`, signozMetricDBName, signozUpdatedMetricsMetadataLocalTable)
	valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
	err := r.db.Exec(valueCtx, delQuery, metricName)
	if err != nil {
		return &model.ApiError{Typ: "ClickHouseError", Err: err}
	}
	r.cache.Delete(ctx, orgID, constants.UpdatedMetricsMetadataCachePrefix+metricName)
	return nil
}

func (r *ClickHouseReader) UpdateMetricsMetadata(ctx context.Context, orgID valuer.UUID, req *model.UpdateMetricsMetadata) *model.ApiError {
	if req.MetricType == v3.MetricTypeHistogram {
		labels := []string{"le"}
		hasLabels, apiError := r.CheckForLabelsInMetric(ctx, req.MetricName, labels)
		if apiError != nil {
			return apiError
		}
		if !hasLabels {
			return &model.ApiError{
				Typ: model.ErrorBadData,
				Err: fmt.Errorf("metric '%s' cannot be set as histogram type", req.MetricName),
			}
		}
	}

	if req.MetricType == v3.MetricTypeSummary {
		labels := []string{"quantile"}
		hasLabels, apiError := r.CheckForLabelsInMetric(ctx, req.MetricName, labels)
		if apiError != nil {
			return apiError
		}
		if !hasLabels {
			return &model.ApiError{
				Typ: model.ErrorBadData,
				Err: fmt.Errorf("metric '%s' cannot be set as summary type", req.MetricName),
			}
		}
	}

	apiErr := r.DeleteMetricsMetadata(ctx, orgID, req.MetricName)
	if apiErr != nil {
		return apiErr
	}
	insertQuery := fmt.Sprintf(`INSERT INTO %s.%s (metric_name, temporality, is_monotonic, type, description, unit, created_at)
VALUES ( ?, ?, ?, ?, ?, ?, ?);`, signozMetricDBName, signozUpdatedMetricsMetadataTable)
	valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
	err := r.db.Exec(valueCtx, insertQuery, req.MetricName, req.Temporality, req.IsMonotonic, req.MetricType, req.Description, req.Unit, req.CreatedAt.UnixMilli())
	if err != nil {
		return &model.ApiError{Typ: "ClickHouseError", Err: err}
	}
	err = r.cache.Set(ctx, orgID, constants.UpdatedMetricsMetadataCachePrefix+req.MetricName, req, -1)
	if err != nil {
		return &model.ApiError{Typ: "CachingErr", Err: err}
	}
	return nil
}

func (r *ClickHouseReader) CheckForLabelsInMetric(ctx context.Context, metricName string, labels []string) (bool, *model.ApiError) {
	if len(labels) == 0 {
		return true, nil
	}

	conditions := "metric_name = ?"
	for range labels {
		conditions += " AND JSONHas(labels, ?) = 1"
	}

	query := fmt.Sprintf(`
        SELECT count(*) > 0 as has_le
        FROM %s.%s
        WHERE %s
        LIMIT 1`, signozMetricDBName, signozTSTableNameV41Day, conditions)

	args := make([]interface{}, 0, len(labels)+1)
	args = append(args, metricName)
	for _, label := range labels {
		args = append(args, label)
	}

	var hasLE bool
	valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
	err := r.db.QueryRow(valueCtx, query, args...).Scan(&hasLE)
	if err != nil {
		return false, &model.ApiError{
			Typ: "ClickHouseError",
			Err: fmt.Errorf("error checking summary labels: %v", err),
		}
	}
	return hasLE, nil
}

func (r *ClickHouseReader) PreloadMetricsMetadata(ctx context.Context, orgID valuer.UUID) []error {
	var allMetricsMetadata []model.UpdateMetricsMetadata
	var errorList []error
	// Fetch all rows from ClickHouse
	query := fmt.Sprintf(`SELECT metric_name, type, description , temporality, is_monotonic, unit
		FROM %s.%s;`, signozMetricDBName, signozUpdatedMetricsMetadataTable)
	valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
	err := r.db.Select(valueCtx, &allMetricsMetadata, query)
	if err != nil {
		errorList = append(errorList, err)
		return errorList
	}
	for _, m := range allMetricsMetadata {
		err := r.cache.Set(ctx, orgID, constants.UpdatedMetricsMetadataCachePrefix+m.MetricName, &m, -1)
		if err != nil {
			errorList = append(errorList, err)
		}
	}

	return errorList
}

func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID valuer.UUID, metricNames ...string) (map[string]*model.UpdateMetricsMetadata, *model.ApiError) {
	cachedMetadata := make(map[string]*model.UpdateMetricsMetadata)
	var missingMetrics []string

	// First, try retrieving each metric from cache.
	for _, metricName := range metricNames {
		metadata := new(model.UpdateMetricsMetadata)
		cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metricName
		err := r.cache.Get(ctx, orgID, cacheKey, metadata, true)
		if err == nil {
			cachedMetadata[metricName] = metadata
		} else {
			if err != nil {
				zap.L().Error("Error retrieving metrics metadata from cache", zap.String("metric_name", metricName), zap.Error(err))
			}
			missingMetrics = append(missingMetrics, metricName)
		}
	}

	// If there are any metrics missing in the cache, query them from the database.
	if len(missingMetrics) > 0 {
		// Join the missing metric names; ensure proper quoting if needed.
		metricList := "'" + strings.Join(metricNames, "', '") + "'"
		query := fmt.Sprintf(`SELECT metric_name, type, description, temporality, is_monotonic, unit
			FROM %s.%s 
			WHERE metric_name IN (%s);`, signozMetricDBName, signozUpdatedMetricsMetadataTable, metricList)

		valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
		rows, err := r.db.Query(valueCtx, query)
		if err != nil {
			return cachedMetadata, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error querying metrics metadata: %v", err)}
		}
		defer rows.Close()

		for rows.Next() {
			metadata := new(model.UpdateMetricsMetadata)
			if err := rows.Scan(
				&metadata.MetricName,
				&metadata.MetricType,
				&metadata.Description,
				&metadata.Temporality,
				&metadata.IsMonotonic,
				&metadata.Unit,
			); err != nil {
				return cachedMetadata, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error scanning metrics metadata: %v", err)}
			}

			// Cache the result for future requests.
			cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metadata.MetricName
			if cacheErr := r.cache.Set(ctx, orgID, cacheKey, metadata, -1); cacheErr != nil {
				zap.L().Error("Failed to store metrics metadata in cache", zap.String("metric_name", metadata.MetricName), zap.Error(cacheErr))
			}
			cachedMetadata[metadata.MetricName] = metadata
		}
	}

	return cachedMetadata, nil
}

func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.SearchTracesParams) (*[]model.SearchSpansResult, error) {
	searchSpansResult := []model.SearchSpansResult{
		{
			Columns:   []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References", "Events", "HasError", "StatusMessage", "StatusCodeString", "SpanKind"},
			IsSubTree: false,
			Events:    make([][]interface{}, 0),
		},
	}

	var traceSummary model.TraceSummary
	summaryQuery := fmt.Sprintf("SELECT * from %s.%s WHERE trace_id=$1", r.TraceDB, r.traceSummaryTable)
	err := r.db.QueryRow(ctx, summaryQuery, params.TraceID).Scan(&traceSummary.TraceID, &traceSummary.Start, &traceSummary.End, &traceSummary.NumSpans)
	if err != nil {
		if err == sql.ErrNoRows {
			return &searchSpansResult, nil
		}
		zap.L().Error("Error in processing sql query", zap.Error(err))
		return nil, fmt.Errorf("error in processing sql query")
	}

	if traceSummary.NumSpans > uint64(params.MaxSpansInTrace) {
		zap.L().Error("Max spans allowed in a trace limit reached", zap.Int("MaxSpansInTrace", params.MaxSpansInTrace),
			zap.Uint64("Count", traceSummary.NumSpans))
		claims, errv2 := authtypes.ClaimsFromContext(ctx)
		if errv2 == nil {
			data := map[string]interface{}{
				"traceSize":            traceSummary.NumSpans,
				"maxSpansInTraceLimit": params.MaxSpansInTrace,
				"algo":                 "smart",
			}
			telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_MAX_SPANS_ALLOWED_LIMIT_REACHED, data, claims.Email, true, false)
		}
		return nil, fmt.Errorf("max spans allowed in trace limit reached, please contact support for more details")
	}

	claims, errv2 := authtypes.ClaimsFromContext(ctx)
	if errv2 == nil {
		data := map[string]interface{}{
			"traceSize": traceSummary.NumSpans,
			"algo":      "smart",
		}
		telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_TRACE_DETAIL_API, data, claims.Email, true, false)
	}

	var startTime, endTime, durationNano uint64
	var searchScanResponses []model.SpanItemV2

	query := fmt.Sprintf("SELECT timestamp, duration_nano, span_id, trace_id, has_error, kind, resource_string_service$$name, name, references, attributes_string, attributes_number, attributes_bool, resources_string, events, status_message, status_code_string, kind_string FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3", r.TraceDB, r.traceTableName)

	start := time.Now()

	err = r.db.Select(ctx, &searchScanResponses, query, params.TraceID, strconv.FormatInt(traceSummary.Start.Unix()-1800, 10), strconv.FormatInt(traceSummary.End.Unix(), 10))

	zap.L().Info(query)

	if err != nil {
		zap.L().Error("Error in processing sql query", zap.Error(err))
		return nil, fmt.Errorf("error in processing sql query")
	}
	end := time.Now()
	zap.L().Debug("getTraceSQLQuery took: ", zap.Duration("duration", end.Sub(start)))

	searchSpansResult[0].Events = make([][]interface{}, len(searchScanResponses))

	searchSpanResponses := []model.SearchSpanResponseItem{}
	start = time.Now()
	for _, item := range searchScanResponses {
		ref := []model.OtelSpanRef{}
		err := json.Unmarshal([]byte(item.References), &ref)
		if err != nil {
			zap.L().Error("Error unmarshalling references", zap.Error(err))
			return nil, err
		}

		// merge attributes_number and attributes_bool to attributes_string
		for k, v := range item.Attributes_bool {
			item.Attributes_string[k] = fmt.Sprintf("%v", v)
		}
		for k, v := range item.Attributes_number {
			item.Attributes_string[k] = strconv.FormatFloat(v, 'f', -1, 64)
		}
		for k, v := range item.Resources_string {
			item.Attributes_string[k] = v
		}

		jsonItem := model.SearchSpanResponseItem{
			SpanID:           item.SpanID,
			TraceID:          item.TraceID,
			ServiceName:      item.ServiceName,
			Name:             item.Name,
			Kind:             int32(item.Kind),
			DurationNano:     int64(item.DurationNano),
			HasError:         item.HasError,
			StatusMessage:    item.StatusMessage,
			StatusCodeString: item.StatusCodeString,
			SpanKind:         item.SpanKind,
			References:       ref,
			Events:           item.Events,
			TagMap:           item.Attributes_string,
		}

		jsonItem.TimeUnixNano = uint64(item.TimeUnixNano.UnixNano() / 1000000)

		searchSpanResponses = append(searchSpanResponses, jsonItem)
		if startTime == 0 || jsonItem.TimeUnixNano < startTime {
			startTime = jsonItem.TimeUnixNano
		}
		if endTime == 0 || jsonItem.TimeUnixNano > endTime {
			endTime = jsonItem.TimeUnixNano
		}
		if durationNano == 0 || uint64(jsonItem.DurationNano) > durationNano {
			durationNano = uint64(jsonItem.DurationNano)
		}
	}
	end = time.Now()
	zap.L().Debug("getTraceSQLQuery unmarshal took: ", zap.Duration("duration", end.Sub(start)))

	if len(searchScanResponses) > params.SpansRenderLimit {
		start = time.Now()
		searchSpansResult, err = smart.SmartTraceAlgorithm(searchSpanResponses, params.SpanID, params.LevelUp, params.LevelDown, params.SpansRenderLimit)
		if err != nil {
			return nil, err
		}
		end = time.Now()
		zap.L().Debug("smartTraceAlgo took: ", zap.Duration("duration", end.Sub(start)))
		claims, errv2 := authtypes.ClaimsFromContext(ctx)
		if errv2 == nil {
			data := map[string]interface{}{
				"traceSize":        len(searchScanResponses),
				"spansRenderLimit": params.SpansRenderLimit,
				"algo":             "smart",
			}
			telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_LARGE_TRACE_OPENED, data, claims.Email, true, false)
		}
	} else {
		for i, item := range searchSpanResponses {
			spanEvents := item.GetValues()
			searchSpansResult[0].Events[i] = spanEvents
		}
	}

	searchSpansResult[0].StartTimestampMillis = startTime - (durationNano / 1000000)
	searchSpansResult[0].EndTimestampMillis = endTime + (durationNano / 1000000)

	return &searchSpansResult, nil
}
